You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/26 05:33:59 UTC

[dolphinscheduler] branch dev updated: Refactor worker execute task process (#11540)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1b120e3a59 Refactor worker execute task process (#11540)
1b120e3a59 is described below

commit 1b120e3a5959a277c1b363dcd2d9cb88fcc32cc9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Aug 26 13:33:51 2022 +0800

    Refactor worker execute task process (#11540)
    
    * Refactor worker execute task process
---
 .../plugin/task/api/AbstractTask.java              |  10 +-
 .../plugin/task/api/AbstractYarnTask.java          |  11 +-
 .../plugin/task/api/k8s/AbstractK8sTask.java       |   2 +-
 .../plugin/task/chunjun/ChunJunTask.java           |  23 +-
 .../plugin/task/datax/DataxTask.java               |  48 +--
 .../plugin/task/dinky/DinkyTask.java               |  95 +++---
 .../dolphinscheduler/plugin/task/dvc/DvcTask.java  |  14 +-
 .../plugin/task/emr/EmrAddStepsTask.java           |  17 +-
 .../plugin/task/emr/EmrJobFlowTask.java            |   6 +-
 .../plugin/task/http/HttpTask.java                 |   5 +-
 .../plugin/task/jupyter/JupyterTask.java           |  24 +-
 .../plugin/task/mlflow/MlflowTask.java             |  16 +-
 .../plugin/task/pigeon/PigeonTask.java             |  12 +-
 .../plugin/task/procedure/ProcedureTask.java       |  12 +-
 .../plugin/task/python/PythonTask.java             |   2 +-
 .../plugin/task/pytorch/PytorchTask.java           |   9 +-
 .../plugin/task/sagemaker/SagemakerTask.java       |   5 +-
 .../plugin/task/seatunnel/SeatunnelTask.java       |  21 +-
 .../plugin/task/shell/ShellTask.java               |  19 +-
 .../dolphinscheduler/plugin/task/sql/SqlTask.java  |  15 +-
 .../plugin/task/zeppelin/ZeppelinTask.java         |  11 +-
 .../plugin/task/zeppelin/ZeppelinTaskTest.java     |   5 +-
 .../worker/processor/TaskDispatchProcessor.java    | 128 ++------
 .../server/worker/processor/TaskKillProcessor.java |  27 +-
 .../worker/processor/TaskSavePointProcessor.java   |  19 +-
 .../DefaultWorkerDelayTaskExecuteRunnable.java     |  60 ++++
 ...faultWorkerDelayTaskExecuteRunnableFactory.java |  53 +++
 .../server/worker/runner/TaskExecuteThread.java    | 364 ---------------------
 .../runner/WorkerDelayTaskExecuteRunnable.java     |  61 ++++
 .../WorkerDelayTaskExecuteRunnableFactory.java     |  59 ++++
 .../server/worker/runner/WorkerExecService.java    |   8 +-
 .../server/worker/runner/WorkerManagerThread.java  |  60 ++--
 .../worker/runner/WorkerTaskExecuteRunnable.java   | 275 ++++++++++++++++
 .../runner/WorkerTaskExecuteRunnableFactory.java   |  23 ++
 .../WorkerTaskExecuteRunnableFactoryBuilder.java   |  50 +++
 .../worker/utils/TaskExecutionCheckerUtils.java    | 129 ++++++++
 .../processor/TaskDispatchProcessorTest.java       | 171 +++-------
 .../DefaultWorkerDelayTaskExecuteRunnableTest.java |  73 +++++
 .../worker/runner/TaskExecuteThreadTest.java       |  87 -----
 39 files changed, 1106 insertions(+), 923 deletions(-)

diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index 80ee36b9d1..0b23905059 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -28,9 +28,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -103,12 +101,8 @@ public abstract class AbstractTask {
         return null;
     }
 
-    /**
-     * task handle
-     *
-     * @throws Exception exception
-     */
-    public abstract void handle() throws Exception;
+    public abstract void handle() throws TaskException;
+
 
     /**
      * cancel application
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 4f1913ba3d..c9a9c4851c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.api;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 
-import java.util.List;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
@@ -51,7 +49,7 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // SHELL task exit code
             TaskResponse response = shellCommandExecutor.run(buildCommand());
@@ -59,10 +57,15 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
             // set appIds
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(response.getProcessId());
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            logger.info("The current yarn task has been interrupted", ex);
+            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+            throw new TaskException("The current yarn task has been interrupted", ex);
         } catch (Exception e) {
             logger.error("yarn process failure", e);
             exitStatusCode = -1;
-            throw e;
+            throw new TaskException("Execute task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
index 8d7f7eac7f..a82e5f6bbd 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -39,7 +39,7 @@ public abstract class AbstractK8sTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
             setExitStatusCode(response.getExitStatusCode());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index a416d0c02c..46fcf2ec2f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -17,12 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.task.chunjun;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -32,9 +32,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.enums.Flag;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.SystemUtils;
-
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -48,6 +45,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
 /**
  * chunjun task
  */
@@ -101,10 +101,10 @@ public class ChunJunTask extends AbstractTaskExecutor {
     /**
      * run chunjun process
      *
-     * @throws Exception exception
+     * @throws TaskException exception
      */
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
 
@@ -115,10 +115,15 @@ public class ChunJunTask extends AbstractTaskExecutor {
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current ChunJun Task has been interrupted", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException("The current ChunJun Task has been interrupted", e);
         } catch (Exception e) {
             logger.error("chunjun task failed.", e);
             setExitStatusCode(EXIT_CODE_FAILURE);
-            throw e;
+            throw new TaskException("Execute chunjun task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 08a8c8acc8..873d8f7424 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -17,32 +17,38 @@
 
 package org.apache.dolphinscheduler.plugin.task.datax;
 
-import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.druid.sql.ast.statement.SQLSelect;
+import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
+import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
+import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
+import com.alibaba.druid.sql.parser.SQLStatementParser;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.enums.Flag;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -58,7 +64,6 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -66,17 +71,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
-import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
-import com.alibaba.druid.sql.ast.statement.SQLSelect;
-import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
-import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
-import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
-import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
-import com.alibaba.druid.sql.parser.SQLStatementParser;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
 
 public class DataxTask extends AbstractTaskExecutor {
     /**
@@ -150,7 +147,7 @@ public class DataxTask extends AbstractTaskExecutor {
      * @throws Exception if error throws Exception
      */
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // replace placeholder,and combine local and global parameters
             Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
@@ -163,10 +160,15 @@ public class DataxTask extends AbstractTaskExecutor {
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current DataX task has been interrupted", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException("The current DataX task has been interrupted", e);
         } catch (Exception e) {
             logger.error("datax task error", e);
             setExitStatusCode(EXIT_CODE_FAILURE);
-            throw e;
+            throw new TaskException("Execute DataX task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
index 622a674780..987126b0a7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
@@ -17,13 +17,17 @@
 
 package org.apache.dolphinscheduler.plugin.task.dinky;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.MissingNode;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
@@ -39,10 +43,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.MissingNode;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 
 public class DinkyTask extends AbstractTaskExecutor {
 
@@ -77,47 +78,55 @@ public class DinkyTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
-        String address = this.dinkyParameters.getAddress();
-        String taskId = this.dinkyParameters.getTaskId();
-        boolean isOnline = this.dinkyParameters.isOnline();
-        JsonNode result;
-        if (isOnline) {
-            // Online dinky task, and only one job is allowed to execute
-            result = onlineTask(address, taskId);
-        } else {
-            // Submit dinky task
-            result = submitTask(address, taskId);
-        }
-        if (checkResult(result)) {
-            boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
-            String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
-            boolean finishFlag = false;
-            while (!finishFlag) {
-                JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
-                if (!checkResult(jobInstanceInfoResult)) {
-                    break;
-                }
-                String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
-                switch (jobInstanceStatus) {
-                    case DinkyTaskConstants.STATUS_FINISHED:
-                        final int exitStatusCode = mapStatusToExitCode(status);
-                        // Use address-taskId as app id
-                        setAppIds(String.format("%s-%s", address, taskId));
-                        setExitStatusCode(exitStatusCode);
-                        logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
-                        finishFlag = true;
-                        break;
-                    case DinkyTaskConstants.STATUS_FAILED:
-                    case DinkyTaskConstants.STATUS_CANCELED:
-                    case DinkyTaskConstants.STATUS_UNKNOWN:
-                        errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
-                        finishFlag = true;
+    public void handle() throws TaskException {
+        try {
+
+            String address = this.dinkyParameters.getAddress();
+            String taskId = this.dinkyParameters.getTaskId();
+            boolean isOnline = this.dinkyParameters.isOnline();
+            JsonNode result;
+            if (isOnline) {
+                // Online dinky task, and only one job is allowed to execute
+                result = onlineTask(address, taskId);
+            } else {
+                // Submit dinky task
+                result = submitTask(address, taskId);
+            }
+            if (checkResult(result)) {
+                boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
+                String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
+                boolean finishFlag = false;
+                while (!finishFlag) {
+                    JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
+                    if (!checkResult(jobInstanceInfoResult)) {
                         break;
-                    default:
-                        Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
+                    }
+                    String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
+                    switch (jobInstanceStatus) {
+                        case DinkyTaskConstants.STATUS_FINISHED:
+                            final int exitStatusCode = mapStatusToExitCode(status);
+                            // Use address-taskId as app id
+                            setAppIds(String.format("%s-%s", address, taskId));
+                            setExitStatusCode(exitStatusCode);
+                            logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
+                            finishFlag = true;
+                            break;
+                        case DinkyTaskConstants.STATUS_FAILED:
+                        case DinkyTaskConstants.STATUS_CANCELED:
+                        case DinkyTaskConstants.STATUS_UNKNOWN:
+                            errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
+                            finishFlag = true;
+                            break;
+                        default:
+                            Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
+                    }
                 }
             }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            logger.error("Execute dinkyTask failed", ex);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException("Execute dinkyTask failed", ex);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 8176999e4c..2e3bece309 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -17,11 +17,10 @@
 
 package org.apache.dolphinscheduler.plugin.task.dvc;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -30,6 +29,8 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
 /**
  * shell task
  */
@@ -74,7 +75,7 @@ public class DvcTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // construct process
             String command = buildCommand();
@@ -83,10 +84,15 @@ public class DvcTask extends AbstractTaskExecutor {
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
             parameters.dealOutParam(shellCommandExecutor.getVarPool());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current DvcTask has been interrupted", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException("The current DvcTask has been interrupted", e);
         } catch (Exception e) {
             logger.error("dvc task error", e);
             setExitStatusCode(EXIT_CODE_FAILURE);
-            throw e;
+            throw new TaskException("Execute dvc task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index d747577b71..6f4fc95e48 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -17,12 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.task.emr;
 
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
-
 import com.amazonaws.SdkBaseException;
 import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
 import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@@ -36,6 +30,12 @@ import com.amazonaws.services.elasticmapreduce.model.StepState;
 import com.amazonaws.services.elasticmapreduce.model.StepStatus;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Sets;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
 
 /**
  * AddJobFlowSteps task executor
@@ -62,7 +62,7 @@ public class EmrAddStepsTask extends AbstractEmrTask {
     }
 
     @Override
-    public void handle() throws InterruptedException {
+    public void handle() throws TaskException {
         StepStatus stepStatus = null;
         try {
             AddJobFlowStepsRequest addJobFlowStepsRequest = createAddJobFlowStepsRequest();
@@ -84,6 +84,9 @@ public class EmrAddStepsTask extends AbstractEmrTask {
 
         } catch (EmrTaskException | SdkBaseException e) {
             logger.error("emr task submit failed with error", e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new TaskException("Execute emr task failed", e);
         } finally {
             final int exitStatusCode = calculateExitStatusCode(stepStatus);
             setExitStatusCode(exitStatusCode);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index ed42de2831..bc361f6538 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.plugin.task.emr;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 
 import java.util.HashSet;
@@ -55,7 +56,7 @@ public class EmrJobFlowTask extends AbstractEmrTask {
     }
 
     @Override
-    public void handle() throws InterruptedException {
+    public void handle() throws TaskException {
         ClusterStatus clusterStatus = null;
         try {
             RunJobFlowRequest runJobFlowRequest = createRunJobFlowRequest();
@@ -76,6 +77,9 @@ public class EmrJobFlowTask extends AbstractEmrTask {
 
         } catch (EmrTaskException | SdkBaseException e) {
             logger.error("emr task submit failed with error", e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new TaskException("Execute emr task failed", e);
         } finally {
             final int exitStatusCode = calculateExitStatusCode(clusterStatus);
             setExitStatusCode(exitStatusCode);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
index 47bb7e9b21..df17725cf2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.http;
 import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON;
 
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -89,7 +90,7 @@ public class HttpTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         long startTime = System.currentTimeMillis();
         String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
         String statusCode = null;
@@ -108,7 +109,7 @@ public class HttpTask extends AbstractTaskExecutor {
             appendMessage(e.toString());
             exitStatusCode = -1;
             logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
-            throw e;
+            throw new TaskException("Execute http task failed", e);
         }
 
     }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 17f4946e19..ab135ac99e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -17,28 +17,27 @@
 
 package org.apache.dolphinscheduler.plugin.task.jupyter;
 
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.utils.DateUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 public class JupyterTask extends AbstractTaskExecutor {
 
     /**
@@ -78,17 +77,22 @@ public class JupyterTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // SHELL task exit code
             TaskResponse response = shellCommandExecutor.run(buildCommand());
             setExitStatusCode(response.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(response.getProcessId());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current Jupyter task has been interrupted", e);
+            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+            throw new TaskException("The current Jupyter task has been interrupted", e);
         } catch (Exception e) {
             logger.error("jupyter task execution failure", e);
             exitStatusCode = -1;
-            throw e;
+            throw new TaskException("Execute jupyter task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 6e49da322e..0c42a5c09b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -17,12 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.task.mlflow;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -36,6 +35,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
 /**
  * shell task
  */
@@ -80,7 +81,7 @@ public class MlflowTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // construct process
             String command = buildCommand();
@@ -95,10 +96,15 @@ public class MlflowTask extends AbstractTaskExecutor {
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
             mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current Mlflow task has been interrupted", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException("The current Mlflow task has been interrupted", e);
         } catch (Exception e) {
-            logger.error("shell task error", e);
+            logger.error("Mlflow task error", e);
             setExitStatusCode(EXIT_CODE_FAILURE);
-            throw e;
+            throw new TaskException("Execute Mlflow task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index 028eddb1c1..e5771d588d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -17,14 +17,14 @@
 
 package org.apache.dolphinscheduler.plugin.task.pigeon;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -33,6 +33,8 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
 
 import java.net.HttpURLConnection;
 import java.net.URI;
@@ -42,9 +44,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
-
 /**
  * TIS DataX Task
  **/
@@ -74,7 +73,7 @@ public class PigeonTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         // Trigger PIGEON DataX pipeline
         logger.info("start execute PIGEON task");
         long startTime = System.currentTimeMillis();
@@ -150,6 +149,7 @@ public class PigeonTask extends AbstractTaskExecutor {
             if (e instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
             }
+            throw new TaskException("Execute pigeon task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index fc886983e5..02e960ebdb 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -17,19 +17,16 @@
 
 package org.apache.dolphinscheduler.plugin.task.procedure;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
-
 import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -44,6 +41,9 @@ import java.sql.Types;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+
 /**
  * procedure task
  */
@@ -84,7 +84,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
                 procedureParameters.getType(),
                 procedureParameters.getDatasource(),
@@ -123,7 +123,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
         } catch (Exception e) {
             setExitStatusCode(EXIT_CODE_FAILURE);
             logger.error("procedure task error", e);
-            throw e;
+            throw new TaskException("Execute procedure task failed", e);
         } finally {
             close(stmt, connection);
         }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 75dae471e4..6c18d206d9 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -100,7 +100,7 @@ public class PythonTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // generate the content of this python script
             String pythonScriptContent = buildPythonScriptContent();
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index b738194d4e..3cd7c7c4f8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -65,7 +65,7 @@ public class PytorchTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             String command = buildPythonExecuteCommand();
             TaskResponse taskResponse = shellCommandExecutor.run(command);
@@ -73,9 +73,14 @@ public class PytorchTask extends AbstractTaskExecutor {
             setAppIds(taskResponse.getAppIds());
             setProcessId(taskResponse.getProcessId());
             setVarPool(shellCommandExecutor.getVarPool());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current Pytorch task has been interrupted", e);
+            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+            throw new TaskException("The current Pytorch task has been interrupted", e);
         } catch (Exception e) {
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
-            throw e;
+            throw new TaskException("Pytorch task execute failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index 1431da1e92..2543e60614 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -24,6 +24,7 @@ import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_G
 
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
@@ -80,13 +81,13 @@ public class SagemakerTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws SagemakerTaskException {
+    public void handle() throws TaskException {
         try {
             int exitStatusCode = handleStartPipeline();
             setExitStatusCode(exitStatusCode);
         } catch (Exception e) {
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
-            throw new SagemakerTaskException("SageMaker task error", e);
+            throw new TaskException("SageMaker task error", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index b44bc6eeac..cd3d8ebb22 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -17,12 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.task.seatunnel;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
-
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -30,9 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.BooleanUtils;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -42,6 +39,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
+
 /**
  * seatunnel task
  */
@@ -85,7 +85,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // construct process
             String command = buildCommand();
@@ -94,10 +94,15 @@ public class SeatunnelTask extends AbstractTaskExecutor {
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
             seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current SeaTunnel task has been interrupted", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException("The current SeaTunnel task has been interrupted", e);
         } catch (Exception e) {
             logger.error("SeaTunnel task error", e);
             setExitStatusCode(EXIT_CODE_FAILURE);
-            throw e;
+            throw new TaskException("Execute Seatunnel task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index f333e617b5..7e20bed634 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -17,12 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.task.shell;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -31,8 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-import org.apache.commons.lang3.SystemUtils;
-
 import java.io.File;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
@@ -44,6 +41,9 @@ import java.nio.file.attribute.PosixFilePermissions;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
 /**
  * shell task
  */
@@ -90,7 +90,7 @@ public class ShellTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             // construct process
             String command = buildCommand();
@@ -99,10 +99,15 @@ public class ShellTask extends AbstractTaskExecutor {
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
             shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("The current Shell task has been interrupted", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException("The current Shell task has been interrupted", e);
         } catch (Exception e) {
             logger.error("shell task error", e);
             setExitStatusCode(EXIT_CODE_FAILURE);
-            throw e;
+            throw new TaskException("Execute shell task error", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 6e4e6f14c9..f884f46e5e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.plugin.task.sql;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
@@ -39,8 +42,7 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -58,11 +60,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 public class SqlTask extends AbstractTaskExecutor {
 
     /**
@@ -117,7 +114,7 @@ public class SqlTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         logger.info("Full sql parameters: {}", sqlParameters);
         logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit  {}",
                 sqlParameters.getType(),
@@ -163,7 +160,7 @@ public class SqlTask extends AbstractTaskExecutor {
         } catch (Exception e) {
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
             logger.error("sql task error", e);
-            throw e;
+            throw new TaskException("Execute sql task failed", e);
         }
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index f46b7ce56c..f51b352d9c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -17,13 +17,15 @@
 
 package org.apache.dolphinscheduler.plugin.task.zeppelin;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kong.unirest.Unirest;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.DateUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-
 import org.apache.zeppelin.client.ClientConfig;
 import org.apache.zeppelin.client.NoteResult;
 import org.apache.zeppelin.client.ParagraphResult;
@@ -34,10 +36,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import kong.unirest.Unirest;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 public class ZeppelinTask extends AbstractTaskExecutor {
 
     /**
@@ -77,7 +75,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
     }
 
     @Override
-    public void handle() throws Exception {
+    public void handle() throws TaskException {
         try {
             final String paragraphId = this.zeppelinParameters.getParagraphId();
             final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory();
@@ -142,6 +140,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
         } catch (Exception e) {
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
             logger.error("zeppelin task submit failed with error", e);
+            throw new TaskException("Execute ZeppelinTask exception");
         }
 
     }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 397d68347e..ff70ba5b45 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -28,6 +28,7 @@ import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.powermock.api.mockito.PowerMockito.when;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.utils.DateUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@@ -127,10 +128,10 @@ public class ZeppelinTaskTest {
         Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
     }
 
-    @Test
+    @Test(expected = TaskException.class)
     public void testHandleWithParagraphExecutionException() throws Exception {
         when(this.zClient.executeParagraph(any(), any(), any(Map.class))).
-                thenThrow(new Exception("Something wrong happens from zeppelin side"));
+                thenThrow(new TaskException("Something wrong happens from zeppelin side"));
 //        when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
         this.zeppelinTask.handle();
         Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
index 17af3e9a47..4bb0c92124 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
@@ -17,14 +17,14 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import org.apache.dolphinscheduler.common.Constants;
+import com.google.common.base.Preconditions;
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -36,26 +36,16 @@ import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import org.apache.commons.lang.SystemUtils;
-
-import java.util.Date;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.google.common.base.Preconditions;
-
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
-import io.netty.channel.Channel;
-
 /**
  * Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
  */
@@ -104,7 +94,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
             logger.error("task execute request command content is null");
             return;
         }
-        final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
+        final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
         logger.info("task execute request message: {}", taskDispatchCommand);
 
         TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
@@ -114,111 +104,39 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
             return;
         }
         try {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId());
-
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
             TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
-
             // set cache, it will be used when kill task
             TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
-
-            // todo custom logger
-
             taskExecutionContext.setHost(workerConfig.getWorkerAddress());
             taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
 
-            if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
-                boolean osUserExistFlag;
-                // if Using distributed is true and Currently supported systems are linux,Should not let it
-                // automatically
-                // create tenants,so TenantAutoCreate has no effect
-                if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
-                    // use the id command to judge in linux
-                    osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
-                } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
-                    // if not exists this user, then create
-                    OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
-                    osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
-                } else {
-                    osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
-                }
-
-                // check if the OS user exists
-                if (!osUserExistFlag) {
-                    logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
-                            taskExecutionContext.getTenantCode(),
-                            taskExecutionContext.getTaskInstanceId());
-                    TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                    taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
-                    taskExecutionContext.setEndTime(new Date());
-                    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
-                            masterAddress,
-                            CommandType.TASK_EXECUTE_RESULT);
-                    return;
-                }
-
-                // local execute path
-                String execLocalPath = getExecLocalPath(taskExecutionContext);
-                logger.info("task instance local execute path : {}", execLocalPath);
-                taskExecutionContext.setExecutePath(execLocalPath);
-
-                try {
-                    FileUtils.createWorkDirIfAbsent(execLocalPath);
-                } catch (Throwable ex) {
-                    logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
-                            execLocalPath,
-                            taskExecutionContext.getTaskInstanceId(),
-                            ex);
-                    TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                    taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
-                    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
-                            masterAddress,
-                            CommandType.TASK_EXECUTE_RESULT);
-                    return;
-                }
-            }
-
             // delay task process
-            long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
-                    taskExecutionContext.getDelayTime() * 60L);
+            long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
             if (remainTime > 0) {
-                logger.info("delay the execution of task instance {}, delay time: {} s",
-                        taskExecutionContext.getTaskInstanceId(),
-                        remainTime);
+                logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
                 taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
-                taskExecutionContext.setStartTime(null);
-                workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+                workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT);
             }
 
+            WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory(
+                            taskExecutionContext,
+                            workerConfig,
+                            workflowMasterAddress,
+                            workerMessageSender,
+                            alertClientService,
+                            taskPluginManager,
+                            storageOperate)
+                    .createWorkerTaskExecuteRunnable();
             // submit task to manager
-            boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
-                    masterAddress,
-                    workerMessageSender,
-                    alertClientService,
-                    taskPluginManager,
-                    storageOperate));
+            boolean offer = workerManager.offer(workerTaskExecuteRunnable);
             if (!offer) {
-                logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
-                        workerManager.getWaitSubmitQueueSize(),
-                        taskExecutionContext.getTaskInstanceId());
-                workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
+                logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
+                workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
             }
         } finally {
             LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
     }
 
-    /**
-     * get execute local path
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return execute local path
-     */
-    private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
-        return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
-                taskExecutionContext.getProcessDefineCode(),
-                taskExecutionContext.getProcessDefineVersion(),
-                taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskInstanceId());
-    }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index f4f85277df..fd7e1d726c 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,6 +17,11 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
@@ -34,25 +39,17 @@ import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
 import org.apache.dolphinscheduler.service.log.LogClientService;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * task kill processor
@@ -161,12 +158,12 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      * @param taskInstanceId
      */
     protected void cancelApplication(int taskInstanceId) {
-        TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
-        if (taskExecuteThread == null) {
+        WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId);
+        if (workerTaskExecuteRunnable == null) {
             logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
             return;
         }
-        AbstractTask task = taskExecuteThread.getTask();
+        AbstractTask task = workerTaskExecuteRunnable.getTask();
         if (task == null) {
             logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
             return;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
index 5401c66859..899ac7d9b7 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
@@ -17,6 +17,10 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -27,20 +31,13 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
 import org.apache.dolphinscheduler.remote.command.TaskSavePointResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.google.common.base.Preconditions;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-
 /**
  * task save point processor
  */
@@ -98,12 +95,12 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
     }
 
     protected void doSavePoint(int taskInstanceId) {
-        TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
-        if (taskExecuteThread == null) {
+        WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId);
+        if (workerTaskExecuteRunnable == null) {
             logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
             return;
         }
-        AbstractTask task = taskExecuteThread.getTask();
+        AbstractTask task = workerTaskExecuteRunnable.getTask();
         if (task == null) {
             logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
             return;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
new file mode 100644
index 0000000000..1b63b063e8
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable {
+
+    public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
+                                                 @NonNull WorkerConfig workerConfig,
+                                                 @NonNull String workflowMaster,
+                                                 @NonNull WorkerMessageSender workerMessageSender,
+                                                 @NonNull AlertClientService alertClientService,
+                                                 @NonNull TaskPluginManager taskPluginManager,
+                                                 @Nullable StorageOperate storageOperate) {
+        super(taskExecutionContext, workerConfig, workflowMaster, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
+    }
+
+    @Override
+    public void executeTask() throws TaskException {
+        if (task == null) {
+            throw new TaskException("The task plugin instance is not initialized");
+        }
+        task.handle();
+    }
+
+    @Override
+    protected void afterExecute() {
+        super.afterExecute();
+    }
+
+    @Override
+    protected void afterThrowing(Throwable throwable) throws TaskException {
+        super.afterThrowing(throwable);
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
new file mode 100644
index 0000000000..53de57ec68
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+public class DefaultWorkerDelayTaskExecuteRunnableFactory extends WorkerDelayTaskExecuteRunnableFactory<DefaultWorkerDelayTaskExecuteRunnable> {
+
+    protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
+                                                           @NonNull WorkerConfig workerConfig,
+                                                           @NonNull String workflowMasterAddress,
+                                                           @NonNull WorkerMessageSender workerMessageSender,
+                                                           @NonNull AlertClientService alertClientService,
+                                                           @NonNull TaskPluginManager taskPluginManager,
+                                                           @Nullable StorageOperate storageOperate) {
+        super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
+    }
+
+    @Override
+    public DefaultWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() {
+        return new DefaultWorkerDelayTaskExecuteRunnable(
+                taskExecutionContext,
+                workerConfig,
+                workflowMasterAddress,
+                workerMessageSender,
+                alertClientService,
+                taskPluginManager,
+                storageOperate);
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
deleted file mode 100644
index 9b7e8c7e85..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import com.google.common.base.Strings;
-import lombok.NonNull;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
-import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
-import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Paths;
-import java.util.*;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-
-/**
- * task scheduler thread
- */
-public class TaskExecuteThread implements Runnable, Delayed {
-
-    /**
-     * logger
-     */
-    private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
-
-    /**
-     * task instance
-     */
-    private final TaskExecutionContext taskExecutionContext;
-
-    private final String masterAddress;
-
-    private final StorageOperate storageOperate;
-
-    /**
-     * abstract task
-     */
-    private AbstractTask task;
-
-    /**
-     * task callback service
-     */
-    private final WorkerMessageSender workerMessageSender;
-
-    /**
-     * alert client server
-     */
-    private final AlertClientService alertClientService;
-
-    private TaskPluginManager taskPluginManager;
-
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @param workerMessageSender  used for worker send message to master
-     */
-    public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
-                             @NonNull String masterAddress,
-                             @NonNull WorkerMessageSender workerMessageSender,
-                             @NonNull AlertClientService alertClientService,
-                             StorageOperate storageOperate) {
-        this.taskExecutionContext = taskExecutionContext;
-        this.masterAddress = masterAddress;
-        this.workerMessageSender = workerMessageSender;
-        this.alertClientService = alertClientService;
-        this.storageOperate = storageOperate;
-    }
-
-    public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
-                             @NonNull String masterAddress,
-                             @NonNull WorkerMessageSender workerMessageSender,
-                             @NonNull AlertClientService alertClientService,
-                             @NonNull TaskPluginManager taskPluginManager,
-                             StorageOperate storageOperate) {
-        this.taskExecutionContext = taskExecutionContext;
-        this.masterAddress = masterAddress;
-        this.workerMessageSender = workerMessageSender;
-        this.alertClientService = alertClientService;
-        this.taskPluginManager = taskPluginManager;
-        this.storageOperate = storageOperate;
-    }
-
-    @Override
-    public void run() {
-        try {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId());
-            if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
-                taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
-                taskExecutionContext.setStartTime(new Date());
-                taskExecutionContext.setEndTime(new Date());
-                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                workerMessageSender.sendMessageWithRetry(taskExecutionContext,
-                        masterAddress,
-                        CommandType.TASK_EXECUTE_RESULT);
-                logger.info("Task dry run success");
-                return;
-            }
-        } finally {
-            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
-        }
-        try {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId());
-            logger.info("script path : {}", taskExecutionContext.getExecutePath());
-            if (taskExecutionContext.getStartTime() == null) {
-                taskExecutionContext.setStartTime(new Date());
-            }
-            logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
-
-            // callback task execute running
-            taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
-            workerMessageSender.sendMessageWithRetry(taskExecutionContext,
-                    masterAddress,
-                    CommandType.TASK_EXECUTE_RUNNING);
-
-            // copy hdfs/minio file to local
-            List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
-                    taskExecutionContext.getResources());
-            if (!fileDownloads.isEmpty()) {
-                downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
-            }
-
-            taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
-
-            taskExecutionContext.setTaskAppId(String.format("%s_%s",
-                    taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId()));
-
-            TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
-            if (null == taskChannel) {
-                throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.",
-                        taskExecutionContext.getTaskType()));
-            }
-            String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
-                    taskExecutionContext.getProcessDefineCode(),
-                    taskExecutionContext.getProcessDefineVersion(),
-                    taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId());
-            taskExecutionContext.setTaskLogName(taskLogName);
-
-            // set the name of the current thread
-            Thread.currentThread().setName(taskLogName);
-
-            task = taskChannel.createTask(taskExecutionContext);
-
-            // task init
-            this.task.init();
-
-            // init varPool
-            this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
-
-            // task handle
-            this.task.handle();
-
-            // task result process
-            if (this.task.getNeedAlert()) {
-                sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus());
-            }
-
-            taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
-            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
-            taskExecutionContext.setProcessId(this.task.getProcessId());
-            taskExecutionContext.setAppIds(this.task.getAppIds());
-            taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
-            logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(),
-                    this.task.getExitStatus());
-        } catch (Throwable e) {
-            logger.error("task scheduler failure", e);
-            kill();
-            taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
-            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
-            taskExecutionContext.setProcessId(this.task.getProcessId());
-            taskExecutionContext.setAppIds(this.task.getAppIds());
-        } finally {
-            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-            workerMessageSender.sendMessageWithRetry(taskExecutionContext,
-                    masterAddress,
-                    CommandType.TASK_EXECUTE_RESULT);
-            clearTaskExecPath();
-            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
-        }
-    }
-
-    private void sendAlert(TaskAlertInfo taskAlertInfo, TaskExecutionStatus status) {
-        int strategy =
-                status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
-        alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(),
-                taskAlertInfo.getContent(), strategy);
-    }
-
-    /**
-     * when task finish, clear execute path.
-     */
-    private void clearTaskExecPath() {
-        logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
-
-        if (!CommonUtils.isDevelopMode()) {
-            // get exec dir
-            String execLocalPath = taskExecutionContext.getExecutePath();
-
-            if (Strings.isNullOrEmpty(execLocalPath)) {
-                logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName());
-                return;
-            }
-
-            if (SINGLE_SLASH.equals(execLocalPath)) {
-                logger.warn("task: {} exec local path is '/', direct deletion is not allowed",
-                        taskExecutionContext.getTaskName());
-                return;
-            }
-
-            try {
-                org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
-                logger.info("exec local path: {} cleared.", execLocalPath);
-            } catch (IOException e) {
-                if (e instanceof NoSuchFileException) {
-                    // this is expected
-                } else {
-                    logger.error("Delete exec dir failed.", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * kill task
-     */
-    public void kill() {
-        if (task != null) {
-            try {
-                task.cancelApplication(true);
-                ProcessUtils.killYarnJob(taskExecutionContext);
-            } catch (Exception e) {
-                logger.error("Kill task failed", e);
-            }
-        }
-    }
-
-    /**
-     * download resource file
-     *
-     * @param execLocalPath execLocalPath
-     * @param fileDownloads projectRes
-     * @param logger logger
-     */
-    public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
-        for (Pair<String, String> fileDownload : fileDownloads) {
-            try {
-                // query the tenant code of the resource according to the name of the resource
-                String fullName = fileDownload.getLeft();
-                String tenantCode = fileDownload.getRight();
-                String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
-                logger.info("get resource file from path:{}", resPath);
-                long resourceDownloadStartTime = System.currentTimeMillis();
-                storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
-                WorkerServerMetrics
-                        .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
-                WorkerServerMetrics.recordWorkerResourceDownloadSize(
-                        Files.size(Paths.get(execLocalPath, fullName)));
-                WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
-            } catch (Exception e) {
-                WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
-                logger.error(e.getMessage(), e);
-                throw new ServiceException(e.getMessage());
-            }
-        }
-    }
-
-    /**
-     * download resource check
-     *
-     * @param execLocalPath
-     * @param projectRes
-     * @return
-     */
-    public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes) {
-        if (MapUtils.isEmpty(projectRes)) {
-            return Collections.emptyList();
-        }
-        List<Pair<String, String>> downloadFile = new ArrayList<>();
-        projectRes.forEach((key, value) -> {
-            File resFile = new File(execLocalPath, key);
-            boolean notExist = !resFile.exists();
-            if (notExist) {
-                downloadFile.add(Pair.of(key, value));
-            } else {
-                logger.info("file : {} exists ", resFile.getName());
-            }
-        });
-        if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
-            throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
-        }
-        return downloadFile;
-    }
-
-    /**
-     * get current TaskExecutionContext
-     *
-     * @return TaskExecutionContext
-     */
-    public TaskExecutionContext getTaskExecutionContext() {
-        return this.taskExecutionContext;
-    }
-
-    @Override
-    public long getDelay(TimeUnit unit) {
-        return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
-                taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
-    }
-
-    @Override
-    public int compareTo(Delayed o) {
-        if (o == null) {
-            return 1;
-        }
-        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
-    }
-
-    public AbstractTask getTask() {
-        return task;
-    }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
new file mode 100644
index 0000000000..73e14c5132
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRunnable implements Delayed {
+
+    protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
+                                             @NonNull WorkerConfig workerConfig,
+                                             @NonNull String masterAddress,
+                                             @NonNull WorkerMessageSender workerMessageSender,
+                                             @NonNull AlertClientService alertClientService,
+                                             @NonNull TaskPluginManager taskPluginManager,
+                                             @Nullable StorageOperate storageOperate) {
+        super(taskExecutionContext, workerConfig, masterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
+        return unit.convert(
+                DateUtils.getRemainTime(
+                        taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+        if (o == null) {
+            return 1;
+        }
+        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
new file mode 100644
index 0000000000..44bb8878d4
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDelayTaskExecuteRunnable> implements WorkerTaskExecuteRunnableFactory<T> {
+
+    protected final @NonNull TaskExecutionContext taskExecutionContext;
+    protected final @NonNull WorkerConfig workerConfig;
+    protected final @NonNull String workflowMasterAddress;
+    protected final @NonNull WorkerMessageSender workerMessageSender;
+    protected final @NonNull AlertClientService alertClientService;
+    protected final @NonNull TaskPluginManager taskPluginManager;
+    protected final @Nullable StorageOperate storageOperate;
+
+    protected WorkerDelayTaskExecuteRunnableFactory(
+            @NonNull TaskExecutionContext taskExecutionContext,
+            @NonNull WorkerConfig workerConfig,
+            @NonNull String workflowMasterAddress,
+            @NonNull WorkerMessageSender workerMessageSender,
+            @NonNull AlertClientService alertClientService,
+            @NonNull TaskPluginManager taskPluginManager,
+            @Nullable StorageOperate storageOperate) {
+        this.taskExecutionContext = taskExecutionContext;
+        this.workerConfig = workerConfig;
+        this.workflowMasterAddress = workflowMasterAddress;
+        this.workerMessageSender = workerMessageSender;
+        this.alertClientService = alertClientService;
+        this.taskPluginManager = taskPluginManager;
+        this.storageOperate = storageOperate;
+    }
+
+
+    public abstract T createWorkerTaskExecuteRunnable();
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
index 46ecfd0d5b..3bdf9534a1 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -50,17 +50,17 @@ public class WorkerExecService {
     /**
      * running task
      */
-    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
+    private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap;
 
     public WorkerExecService(ExecutorService execService,
-                             ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
+                             ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap) {
         this.execService = execService;
         this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
         this.taskExecuteThreadMap = taskExecuteThreadMap;
         WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size);
     }
 
-    public void submit(TaskExecuteThread taskExecuteThread) {
+    public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
         taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
         ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
         FutureCallback futureCallback = new FutureCallback() {
@@ -91,7 +91,7 @@ public class WorkerExecService {
         return ((ThreadPoolExecutor) this.execService).getQueue().size();
     }
 
-    public Map<Integer, TaskExecuteThread> getTaskExecuteThreadMap() {
+    public Map<Integer, WorkerTaskExecuteRunnable> getTaskExecuteThreadMap() {
         return taskExecuteThreadMap;
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 2466700da1..41824a9a44 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -19,17 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.concurrent.BlockingQueue;
+import javax.annotation.Nullable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 
@@ -41,31 +39,16 @@ public class WorkerManagerThread implements Runnable {
 
     private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
 
-    /**
-     * task queue
-     */
-    private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
+    private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
 
-    @Autowired(required = false)
-    private StorageOperate storageOperate;
-
-    /**
-     * thread executor service
-     */
     private final WorkerExecService workerExecService;
 
-    /**
-     * task callback service
-     */
-    @Autowired
-    private WorkerMessageSender workerMessageSender;
-
-    private volatile int workerExecThreads;
+    private final int workerExecThreads;
 
     /**
      * running task
      */
-    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
 
     public WorkerManagerThread(WorkerConfig workerConfig) {
         workerExecThreads = workerConfig.getExecThreads();
@@ -75,8 +58,8 @@ public class WorkerManagerThread implements Runnable {
                 taskExecuteThreadMap);
     }
 
-    public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
-        return this.taskExecuteThreadMap.get(taskInstanceId);
+    public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer taskInstanceId) {
+        return taskExecuteThreadMap.get(taskInstanceId);
     }
 
     /**
@@ -94,7 +77,7 @@ public class WorkerManagerThread implements Runnable {
      * @return queue size
      */
     public int getThreadPoolQueueSize() {
-        return this.workerExecService.getThreadPoolQueueSize();
+        return workerExecService.getThreadPoolQueueSize();
     }
 
     /**
@@ -108,13 +91,7 @@ public class WorkerManagerThread implements Runnable {
                 .forEach(waitSubmitQueue::remove);
     }
 
-    /**
-     * submit task
-     *
-     * @param taskExecuteThread taskExecuteThread
-     * @return submit result
-     */
-    public boolean offer(TaskExecuteThread taskExecuteThread) {
+    public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
         if (waitSubmitQueue.size() > workerExecThreads) {
             WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
             // if waitSubmitQueue is full, it will wait 1s, then try add
@@ -123,7 +100,7 @@ public class WorkerManagerThread implements Runnable {
                 return false;
             }
         }
-        return waitSubmitQueue.offer(taskExecuteThread);
+        return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
     }
 
     public void start() {
@@ -137,15 +114,14 @@ public class WorkerManagerThread implements Runnable {
     @Override
     public void run() {
         Thread.currentThread().setName("Worker-Execute-Manager-Thread");
-        TaskExecuteThread taskExecuteThread;
         while (!ServerLifeCycleManager.isStopped()) {
             try {
                 if (!ServerLifeCycleManager.isRunning()) {
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                 }
                 if (this.getThreadPoolQueueSize() <= workerExecThreads) {
-                    taskExecuteThread = waitSubmitQueue.take();
-                    workerExecService.submit(taskExecuteThread);
+                    final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
+                    workerExecService.submit(workerDelayTaskExecuteRunnable);
                 } else {
                     WorkerServerMetrics.incWorkerOverloadCount();
                     logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
@@ -161,7 +137,17 @@ public class WorkerManagerThread implements Runnable {
 
     public void clearTask() {
         waitSubmitQueue.clear();
-        workerExecService.getTaskExecuteThreadMap().values().forEach(TaskExecuteThread::kill);
+        workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> {
+            int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
+            try {
+                workerTaskExecuteRunnable.cancelTask();
+                logger.info("Cancel the taskInstance in worker  {}", taskInstanceId);
+            } catch (Exception ex) {
+                logger.error("Cancel the taskInstance error {}", taskInstanceId, ex);
+            } finally {
+                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+            }
+        });
         workerExecService.getTaskExecuteThreadMap().clear();
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
new file mode 100644
index 0000000000..bd8beafd6b
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import com.google.common.base.Strings;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Date;
+
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
+public abstract class WorkerTaskExecuteRunnable implements Runnable {
+
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class));
+
+    protected final TaskExecutionContext taskExecutionContext;
+    protected final WorkerConfig workerConfig;
+    protected final String masterAddress;
+    protected final WorkerMessageSender workerMessageSender;
+    protected final AlertClientService alertClientService;
+    protected final TaskPluginManager taskPluginManager;
+    protected final @Nullable StorageOperate storageOperate;
+
+    protected @Nullable AbstractTask task;
+
+    protected WorkerTaskExecuteRunnable(
+            @NonNull TaskExecutionContext taskExecutionContext,
+            @NonNull WorkerConfig workerConfig,
+            @NonNull String masterAddress,
+            @NonNull WorkerMessageSender workerMessageSender,
+            @NonNull AlertClientService alertClientService,
+            @NonNull TaskPluginManager taskPluginManager,
+            @Nullable StorageOperate storageOperate) {
+        this.taskExecutionContext = taskExecutionContext;
+        this.workerConfig = workerConfig;
+        this.masterAddress = masterAddress;
+        this.workerMessageSender = workerMessageSender;
+        this.alertClientService = alertClientService;
+        this.taskPluginManager = taskPluginManager;
+        this.storageOperate = storageOperate;
+        String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
+                taskExecutionContext.getProcessDefineCode(),
+                taskExecutionContext.getProcessDefineVersion(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId());
+        taskExecutionContext.setTaskLogName(taskLogName);
+        logger.info("Set task logger name: {}", taskLogName);
+    }
+
+    protected abstract void executeTask();
+
+    protected void afterExecute() throws TaskException {
+        if (task == null) {
+            throw new TaskException("The current task instance is null");
+        }
+        sendAlertIfNeeded();
+
+        sendTaskResult();
+
+        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        logger.info("Remove the current task execute context from worker cache");
+        clearTaskExecPathIfNeeded();
+    }
+
+    protected void afterThrowing(Throwable throwable) throws TaskException {
+        cancelTask();
+        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
+        taskExecutionContext.setEndTime(new Date());
+        workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+        logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE);
+    }
+
+    public void cancelTask() {
+        // cancel the task
+        if (task != null) {
+            try {
+                task.cancelApplication(true);
+                ProcessUtils.killYarnJob(taskExecutionContext);
+            } catch (Exception e) {
+                logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            // set the thread name to make sure the log be written to the task log file
+            Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
+
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+            logger.info("Begin to pulling task");
+
+            initializeTask();
+
+            if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
+                taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+                taskExecutionContext.setEndTime(new Date());
+                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+                logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
+                return;
+            }
+
+            beforeExecute();
+
+            executeTask();
+
+            afterExecute();
+
+        } catch (Throwable ex) {
+            logger.error("Task execute failed, due to meet an exception", ex);
+            afterThrowing(ex);
+        } finally {
+            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+        }
+    }
+
+    protected void initializeTask() {
+        logger.info("Begin to initialize task");
+
+        Date taskStartTime = new Date();
+        taskExecutionContext.setStartTime(taskStartTime);
+        logger.info("Set task startTime: {}", taskStartTime);
+
+        String systemEnvPath = CommonUtils.getSystemEnvPath();
+        taskExecutionContext.setEnvFile(systemEnvPath);
+        logger.info("Set task envFile: {}", systemEnvPath);
+
+        String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+        taskExecutionContext.setTaskAppId(taskAppId);
+        logger.info("Set task appId: {}", taskAppId);
+
+        logger.info("End initialize task");
+    }
+
+    protected void beforeExecute() {
+        taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
+        workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);
+        logger.info("Set task status to {}", TaskExecutionStatus.RUNNING_EXECUTION);
+
+        TaskExecutionCheckerUtils.checkTenantExist(workerConfig, taskExecutionContext);
+        logger.info("TenantCode:{} check success", taskExecutionContext.getTenantCode());
+
+        TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(taskExecutionContext);
+        logger.info("ProcessExecDir:{} check success", taskExecutionContext.getExecutePath());
+
+        TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
+        logger.info("Resources:{} check success", taskExecutionContext.getResources());
+
+        TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
+        if (null == taskChannel) {
+            throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType()));
+        }
+        task = taskChannel.createTask(taskExecutionContext);
+        if (task == null) {
+            throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType()));
+        }
+        logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType());
+
+        task.init();
+        logger.info("Success initialized task plugin instance success");
+
+        task.getParameters().setVarPool(taskExecutionContext.getVarPool());
+        logger.info("Success set taskVarPool: {}", taskExecutionContext.getVarPool());
+
+    }
+
+    protected void sendAlertIfNeeded() {
+        if (!task.getNeedAlert()) {
+            return;
+        }
+        logger.info("The current task need to send alert, begin to send alert");
+        TaskExecutionStatus status = task.getExitStatus();
+        TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo();
+        int strategy = status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
+        alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
+        logger.info("Success send alert");
+    }
+
+    protected void sendTaskResult() {
+        taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
+        taskExecutionContext.setEndTime(new Date());
+        taskExecutionContext.setProcessId(task.getProcessId());
+        taskExecutionContext.setAppIds(task.getAppIds());
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
+        workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+
+        logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus());
+    }
+
+    protected void clearTaskExecPathIfNeeded() {
+
+        String execLocalPath = taskExecutionContext.getExecutePath();
+        if (!CommonUtils.isDevelopMode()) {
+            logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", execLocalPath);
+            // get exec dir
+            if (Strings.isNullOrEmpty(execLocalPath)) {
+                logger.warn("The task execute file is {} no need to clear", taskExecutionContext.getTaskName());
+                return;
+            }
+
+            if (SINGLE_SLASH.equals(execLocalPath)) {
+                logger.warn("The task execute file is '/', direct deletion is not allowed");
+                return;
+            }
+
+            try {
+                org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
+                logger.info("Success clear the task execute file: {}", execLocalPath);
+            } catch (IOException e) {
+                if (e instanceof NoSuchFileException) {
+                    // this is expected
+                } else {
+                    logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", execLocalPath, e);
+                }
+            }
+        } else {
+            logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", execLocalPath);
+        }
+    }
+
+    public @NonNull TaskExecutionContext getTaskExecutionContext() {
+        return taskExecutionContext;
+    }
+
+    public @Nullable AbstractTask getTask() {
+        return task;
+    }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
new file mode 100644
index 0000000000..441662f4bc
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+public interface WorkerTaskExecuteRunnableFactory<T> {
+
+    T createWorkerTaskExecuteRunnable();
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
new file mode 100644
index 0000000000..f3edfd4c1d
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+@UtilityClass
+public class WorkerTaskExecuteRunnableFactoryBuilder {
+
+    public static WorkerDelayTaskExecuteRunnableFactory<?> createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
+                                                                                                       @NonNull WorkerConfig workerConfig,
+                                                                                                       @NonNull String workflowMasterAddress,
+                                                                                                       @NonNull WorkerMessageSender workerMessageSender,
+                                                                                                       @NonNull AlertClientService alertClientService,
+                                                                                                       @NonNull TaskPluginManager taskPluginManager,
+                                                                                                       @Nullable StorageOperate storageOperate) {
+        return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
+                workerConfig,
+                workflowMasterAddress,
+                workerMessageSender,
+                alertClientService,
+                taskPluginManager,
+                storageOperate);
+    }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
new file mode 100644
index 0000000000..50ac13f331
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TaskExecutionCheckerUtils {
+
+    public static void checkTenantExist(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
+        try {
+            boolean osUserExistFlag;
+            // if Using distributed is true and Currently supported systems are linux,Should not let it
+            // automatically
+            // create tenants,so TenantAutoCreate has no effect
+            if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
+                // use the id command to judge in linux
+                osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
+            } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+                // if not exists this user, then create
+                OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+                osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+            } else {
+                osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+            }
+            if (!osUserExistFlag) {
+                throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
+            }
+        } catch (TaskException ex) {
+            throw ex;
+        } catch (Exception ex) {
+            throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
+        }
+    }
+
+    public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException {
+        try {
+            // local execute path
+            String execLocalPath = FileUtils.getProcessExecDir(
+                    taskExecutionContext.getProjectCode(),
+                    taskExecutionContext.getProcessDefineCode(),
+                    taskExecutionContext.getProcessDefineVersion(),
+                    taskExecutionContext.getProcessInstanceId(),
+                    taskExecutionContext.getTaskInstanceId());
+            taskExecutionContext.setExecutePath(execLocalPath);
+            FileUtils.createWorkDirIfAbsent(execLocalPath);
+        } catch (Throwable ex) {
+            throw new TaskException("Cannot create process execute dir", ex);
+        }
+    }
+
+    public static void downloadResourcesIfNeeded(StorageOperate storageOperate, TaskExecutionContext taskExecutionContext, Logger logger) {
+        String execLocalPath = taskExecutionContext.getExecutePath();
+        Map<String, String> projectRes = taskExecutionContext.getResources();
+        if (MapUtils.isEmpty(projectRes)) {
+            return;
+        }
+        List<Pair<String, String>> downloadFiles = new ArrayList<>();
+        projectRes.forEach((key, value) -> {
+            File resFile = new File(execLocalPath, key);
+            boolean notExist = !resFile.exists();
+            if (notExist) {
+                downloadFiles.add(Pair.of(key, value));
+            } else {
+                logger.info("file : {} exists ", resFile.getName());
+            }
+        });
+        if (!downloadFiles.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
+            throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
+        }
+
+        if (CollectionUtils.isNotEmpty(downloadFiles)) {
+            for (Pair<String, String> fileDownload : downloadFiles) {
+                try {
+                    // query the tenant code of the resource according to the name of the resource
+                    String fullName = fileDownload.getLeft();
+                    String tenantCode = fileDownload.getRight();
+                    String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
+                    logger.info("get resource file from path:{}", resPath);
+                    long resourceDownloadStartTime = System.currentTimeMillis();
+                    storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
+                    WorkerServerMetrics
+                            .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
+                    WorkerServerMetrics.recordWorkerResourceDownloadSize(
+                            Files.size(Paths.get(execLocalPath, fullName)));
+                    WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
+                } catch (Exception e) {
+                    WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
+                    throw new TaskException(String.format("Download resource file: %s error", fileDownload), e);
+                }
+            }
+        }
+    }
+}
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
index e66563c0b4..0bf8a26c45 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
@@ -17,156 +17,76 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
-import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-
-import java.util.Date;
-import java.util.concurrent.ExecutorService;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.Date;
 
 /**
  * test task execute processor
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class,
-        JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
-@Ignore
 public class TaskDispatchProcessorTest {
 
-    private TaskExecutionContext taskExecutionContext;
-
-    private WorkerMessageSender workerMessageSender;
-
-    private ExecutorService workerExecService;
-
-    private StorageOperate storageOperate;
+    @InjectMocks
+    private TaskDispatchProcessor taskDispatchProcessor;
 
+    @Mock
     private WorkerConfig workerConfig;
 
-    private Command command;
+    @Mock
+    private WorkerMessageSender workerMessageSender;
 
-    private Command ackCommand;
+    @Mock
+    private AlertClientService alertClientService;
 
-    private TaskDispatchCommand taskRequestCommand;
+    @Mock
+    private TaskPluginManager taskPluginManager;
 
-    private AlertClientService alertClientService;
+    @Mock
+    private WorkerManagerThread workerManagerThread;
 
-    private WorkerManagerThread workerManager;
-
-    @Before
-    public void before() throws Exception {
-        // init task execution context
-        taskExecutionContext = getTaskExecutionContext();
-        workerConfig = new WorkerConfig();
-        workerConfig.setExecThreads(1);
-        workerConfig.setListenPort(1234);
-        command = new Command();
-        command.setType(CommandType.TASK_DISPATCH_REQUEST);
-        ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234",
-                "127.0.0.1:5678",
-                System.currentTimeMillis()).convert2Command();
-        taskRequestCommand = new TaskDispatchCommand(taskExecutionContext,
-                "127.0.0.1:5678",
-                "127.0.0.1:1234",
-                System.currentTimeMillis());
-        alertClientService = PowerMockito.mock(AlertClientService.class);
-        workerExecService = PowerMockito.mock(ExecutorService.class);
-        PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null);
-
-        PowerMockito.mockStatic(ChannelUtils.class);
-        PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
-
-        workerMessageSender = PowerMockito.mock(WorkerMessageSender.class);
-
-        PowerMockito.mockStatic(SpringApplicationContext.class);
-        PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender);
-        PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
-
-        workerManager = PowerMockito.mock(WorkerManagerThread.class);
-
-        storageOperate = PowerMockito.mock(StorageOperate.class);
-        PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext,
-                "127.0.0.1:5678",
-                workerMessageSender,
-                alertClientService,
-                storageOperate))).thenReturn(Boolean.TRUE);
-
-        PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
-
-        PowerMockito.mockStatic(ThreadUtils.class);
-        PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
-                workerConfig.getExecThreads())).thenReturn(
-                        workerExecService);
-
-        PowerMockito.mockStatic(JsonSerializer.class);
-        PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn(
-                taskRequestCommand);
-
-        PowerMockito.mockStatic(JSONUtils.class);
-        PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn(
-                taskRequestCommand);
-
-        PowerMockito.mockStatic(FileUtils.class);
-        PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
-                taskExecutionContext.getProcessDefineCode(),
-                taskExecutionContext.getProcessDefineVersion(),
-                taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskInstanceId())).thenReturn(
-                        taskExecutionContext.getExecutePath());
-        PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
-
-        SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(),
-                workerMessageSender,
-                "127.0.0.1:5678",
-                LoggerFactory.getLogger(
-                        TaskDispatchProcessorTest.class),
-                alertClientService,
-                storageOperate);
-        PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread);
-    }
+    @Mock
+    private StorageOperate storageOperate;
 
     @Test
-    public void testNormalExecution() {
-        TaskDispatchProcessor processor = new TaskDispatchProcessor();
-        processor.process(null, command);
+    public void process() {
+        Channel channel = Mockito.mock(Channel.class);
+        TaskChannel taskChannel = Mockito.mock(TaskChannel.class);
+        Mockito.when(taskPluginManager.getTaskChannel(Mockito.anyString())).thenReturn(taskChannel);
 
-        Assert.assertEquals(TaskExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+        TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
+        Command dispatchCommand = createDispatchCommand(taskExecutionContext);
+        taskDispatchProcessor.process(channel, dispatchCommand);
+
+        Mockito.verify(workerManagerThread, Mockito.atMostOnce()).offer(Mockito.any());
+        Mockito.verify(workerMessageSender, Mockito.never()).sendMessageWithRetry(taskExecutionContext, "localhost:5678", CommandType.TASK_REJECT);
     }
 
-    @Test
-    public void testDelayExecution() {
-        taskExecutionContext.setDelayTime(1);
-        TaskDispatchProcessor processor = new TaskDispatchProcessor();
-        processor.process(null, command);
 
-        Assert.assertEquals(TaskExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+    public Command createDispatchCommand(TaskExecutionContext taskExecutionContext) {
+        return new TaskDispatchCommand(
+                taskExecutionContext,
+                "localhost:5678",
+                "localhost:1234",
+                System.currentTimeMillis()
+        ).convert2Command();
     }
 
     public TaskExecutionContext getTaskExecutionContext() {
@@ -184,21 +104,4 @@ public class TaskDispatchProcessorTest {
         taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
         return taskExecutionContext;
     }
-
-    private static class SimpleTaskExecuteThread extends TaskExecuteThread {
-
-        public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
-                                       WorkerMessageSender workerMessageSender,
-                                       String masterAddress,
-                                       Logger taskLogger,
-                                       AlertClientService alertClientService,
-                                       StorageOperate storageOperate) {
-            super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate);
-        }
-
-        @Override
-        public void run() {
-            //
-        }
-    }
 }
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
new file mode 100644
index 0000000000..39f03119ed
--- /dev/null
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.Date;
+
+public class DefaultWorkerDelayTaskExecuteRunnableTest {
+
+    private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+
+    private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class);
+
+    private String masterAddress = "localhost:5678";
+
+    private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class);
+
+    private AlertClientService alertClientService = Mockito.mock(AlertClientService.class);
+
+    private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class);
+
+    private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
+
+    @Test
+    public void testDryRun() {
+        TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
+                .dryRun(Constants.DRY_RUN_FLAG_YES)
+                .taskInstanceId(0)
+                .processDefineId(0)
+                .firstSubmitTime(new Date())
+                .taskLogName("TestLogName")
+                .build();
+        WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable(
+                taskExecutionContext,
+                workerConfig,
+                masterAddress,
+                workerMessageSender,
+                alertClientService,
+                taskPluginManager,
+                storageOperate
+        );
+
+        Assertions.assertAll(workerTaskExecuteRunnable::run);
+        Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
deleted file mode 100644
index bd492c7ff6..0000000000
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(PowerMockRunner.class)
-public class TaskExecuteThreadTest {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
-
-    @Mock
-    private TaskExecutionContext taskExecutionContext;
-
-    @Mock
-    private WorkerMessageSender workerMessageSender;
-
-    @Mock
-    private AlertClientService alertClientService;
-
-    @Mock
-    private StorageOperate storageOperate;
-
-    @Mock
-    private TaskPluginManager taskPluginManager;
-
-    @Test
-    public void checkTest() {
-        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext,
-                                                                    "127.0.0.1:5678",
-                                                                    workerMessageSender,
-                                                                    alertClientService,
-                                                                    taskPluginManager,
-                                                                    storageOperate);
-
-        String path = "/";
-        Map<String, String> projectRes = new HashMap<>();
-        projectRes.put("shell", "shell.sh");
-        List<Pair<String, String>> downloads = new ArrayList<>();
-        try {
-            downloads = taskExecuteThread.downloadCheck(path, projectRes);
-        } catch (Exception e) {
-            Assert.assertNotNull(e);
-        }
-        downloads.add(Pair.of("shell", "shell.sh"));
-        try{
-            taskExecuteThread.downloadResource(path, LOGGER, downloads);
-        }catch (Exception e){
-
-        }
-    }
-}