You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/10/28 06:50:16 UTC

[dolphinscheduler] 01/01: Implement isolate task api interface

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_isolateTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 8268714c9ac26cb2602ebf762ed8284d2a830123
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Oct 12 22:14:37 2022 +0800

    Implement isolate task api interface
---
 .../api/checker/IsolationTaskChecker.java          |  34 +++
 .../api/checker/IsolationTaskCheckerImpl.java      | 206 +++++++++++++++++
 .../api/checker/WorkflowInstanceCheckerImpl.java   |  24 +-
 .../constants/ApiFuncIdentificationConstant.java   |   5 +
 .../api/controller/IsolationTaskController.java    |  96 ++++++++
 .../dto/request/CleanTaskInstanceStateRequest.java |   2 +
 ...equest.java => IsolationTaskCancelRequest.java} |   8 +-
 ...equest.java => IsolationTaskDeleteRequest.java} |   6 +-
 .../dto/request/IsolationTaskListingRequest.java   |  24 ++
 .../dto/request/IsolationTaskSubmitRequest.java    |  18 ++
 .../apache/dolphinscheduler/api/enums/Status.java  |  39 +++-
 .../api/remote/ApiServerRPCClient.java             |  35 +++
 .../api/service/IsolationTaskService.java          |  32 +++
 .../api/service/ProcessInstanceService.java        |   8 +-
 .../api/service/ProjectService.java                |   2 +
 .../api/service/WorkflowDAGService.java            |  15 ++
 .../api/service/impl/IsolationTaskServiceImpl.java | 246 +++++++++++++++++++++
 .../api/service/impl/ProjectServiceImpl.java       |   7 +-
 .../api/service/impl/TaskInstanceServiceImpl.java  |  25 ++-
 .../api/service/impl/WorkflowDagServiceImpl.java   |  64 ++++++
 .../api/transformer/CommandTransformer.java        |  13 +-
 .../api/transformer/CommandTransformerImpl.java    |  70 +++---
 .../api/vo/IsolationTaskExcelParseVO.java          |  25 +++
 .../src/main/resources/static/excel-en.xlsx        | Bin 0 -> 5718 bytes
 .../apache/dolphinscheduler/common/Constants.java  |   5 +-
 .../dolphinscheduler/common/enums/CommandType.java |   2 +
 .../dao/dto/IsolationTaskStatus.java               |  27 +++
 .../dolphinscheduler/dao/entity/IsolationTask.java |  37 ++++
 .../dao/mapper/IsolationTaskMapper.java            |  32 +++
 .../dao/repository/IsolationTaskDao.java           |  39 ++++
 .../dao/repository/ProcessDefinitionLogDao.java    |  12 +
 .../dao/repository/ProcessInstanceDao.java         |   1 +
 .../dao/repository/ProcessTaskRelationLogDao.java  |  11 +
 .../dao/repository/TaskDefinitionLogDao.java       |  12 +
 .../dao/repository/TaskInstanceDao.java            |   2 +
 .../dao/repository/impl/IsolationTaskDaoImpl.java  |  86 +++++++
 .../impl/ProcessDefinitionLogDaoImpl.java          |  25 +++
 .../impl/ProcessTaskRelationLogDaoImpl.java        |  24 ++
 .../repository/impl/TaskDefinitionLogDaoImpl.java  |  44 ++++
 .../dao/repository/impl/TaskInstanceDaoImpl.java   |   6 +
 .../dolphinscheduler/dao/utils/DagHelper.java      |  99 ++++++++-
 .../dao/mapper/IsolationTaskMapper.xml             |  97 ++++++++
 .../src/main/resources/sql/dolphinscheduler_h2.sql |  17 ++
 .../main/resources/sql/dolphinscheduler_mysql.sql  |  73 +++---
 .../resources/sql/dolphinscheduler_postgresql.sql  |  18 ++
 .../dolphinscheduler/dao/utils/DagHelperTest.java  |  33 ++-
 .../isolation/RefreshIsolationTaskProcessor.java   |  26 +++
 .../server/master/rpc/MasterRPCServer.java         |   8 +
 .../master/runner/FailoverExecuteThread.java       |   5 +
 .../master/runner/MasterSchedulerBootstrap.java    |   7 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 130 ++++++++++-
 .../master/runner/task/BaseTaskProcessor.java      |  16 ++
 .../master/runner/task/BlockingTaskProcessor.java  |  11 +-
 .../master/runner/task/CommonTaskProcessor.java    |  24 +-
 .../master/runner/task/ConditionTaskProcessor.java |  11 +-
 .../master/runner/task/DependentTaskProcessor.java |   8 +
 .../master/runner/task/SubTaskProcessor.java       |  11 +
 .../master/runner/task/SwitchTaskProcessor.java    |   8 +
 .../server/master/runner/task/TaskAction.java      |   1 +
 .../master/service/IsolationTaskManager.java       |  97 ++++++++
 .../server/master/utils/TaskInstanceUtils.java     |   1 -
 .../server/master/utils/WorkflowInstanceUtils.java |  18 ++
 .../master/runner/WorkflowExecuteRunnableTest.java |   6 +-
 .../remote/command/CommandType.java                |   6 +-
 .../isolation/RefreshIsolationTaskRequest.java     |  18 ++
 .../service/process/ProcessServiceImpl.java        |  27 ++-
 .../plugin/task/api/enums/ExecutionStatus.java     |  26 ++-
 67 files changed, 2037 insertions(+), 134 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
new file mode 100644
index 0000000000..9ee19f10be
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
@@ -0,0 +1,34 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface IsolationTaskChecker {
+
+    void checkCanSubmitTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     ProcessInstance processInstance,
+                                     @NonNull List<IsolationTaskExcelParseVO> voList);
+
+    void checkCanOnlineTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     @NonNull ProcessInstance processInstance,
+                                     @NonNull IsolationTask isolationTask);
+
+    void checkCanListingTaskIsolation(@NonNull User loginUser,
+                                      long projectCode);
+
+    void checkCanDeleteTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     long isolationId);
+
+    void checkCanCancelTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     ProcessInstance processInstance,
+                                     @NonNull IsolationTask isolationTasks);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
new file mode 100644
index 0000000000..99402f7fb6
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
@@ -0,0 +1,206 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
+
+@Component
+public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private ProcessDefinitionLogDao processDefinitionLogDao;
+
+    @Autowired
+    private WorkflowDAGService workflowDAGService;
+
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @Override
+    public void checkCanSubmitTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            ProcessInstance processInstance,
+                                            @NonNull List<IsolationTaskExcelParseVO> voList) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+
+        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion())
+                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+        checkSubmitIsolateTaskAuth(loginUser, project, processDefinitionLog);
+
+        DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(processDefinitionLog);
+        checkWorkflowInstanceCanSubmitIsolateTask(processInstance.getName(), processInstance);
+
+        checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG);
+    }
+
+    @Override
+    public void checkCanOnlineTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            ProcessInstance processInstance,
+                                            @NonNull IsolationTask isolationTask) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        checkOnlineIsolationTaskAuth(loginUser, project, processInstance);
+
+        if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) {
+            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE);
+        }
+
+        String workflowInstanceName = isolationTask.getWorkflowInstanceName();
+        checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance);
+    }
+
+    @Override
+    public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST);
+    }
+
+    @Override
+    public void checkCanDeleteTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            long isolationId) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE);
+
+        IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
+                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+        if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE);
+        }
+    }
+
+    @Override
+    public void checkCanCancelTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            ProcessInstance processInstance,
+                                            @NonNull IsolationTask isolationTask) {
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        checkCancelIsolationTaskAuth(loginUser, project, processInstance);
+
+        if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL);
+        }
+    }
+
+    private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName,
+                                                           ProcessInstance processInstance) {
+        // check if the given workflow instance can do isolate operation
+        // If the workflow instance is at ready_xx status, it cannot do isolate operation
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceName);
+        }
+        if (processInstance.getState().typeIsReady()) {
+            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT, workflowInstanceName);
+        }
+    }
+
+    private void checkTaskNodeCanSubmitIsolateTask(@NonNull List<IsolationTaskExcelParseVO> voList,
+                                                   @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        for (IsolationTaskExcelParseVO vo : voList) {
+            String taskCodeStr = Long.toString(vo.getTaskCode());
+            // check if the taskNode exist in DAG
+            if (!workflowDAG.containsNode(taskCodeStr)) {
+                throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST, vo.getTaskName());
+            }
+            // check if the pre task exist an online isolation task
+            // if existed, we cannot create a new isolation task
+            Set<String> allPreNodes = DagHelper.getAllPreNodes(taskCodeStr, workflowDAG);
+            List<IsolationTask> isolationTasks = isolationTaskDao.queryByTaskCodes(vo.getWorkflowInstanceId(),
+                    allPreNodes.stream().map(Long::parseLong).collect(Collectors.toList()));
+            // todo: Do we need to support if the sub isolation task is offline?
+            if (CollectionUtils.isNotEmpty(isolationTasks)) {
+                throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK);
+            }
+        }
+    }
+
+    private void checkSubmitIsolateTaskAuth(@NonNull User loginUser,
+                                            @NonNull Project project,
+                                            @NonNull ProcessDefinitionLog processDefinitionLog) {
+        if (processDefinitionLog.getProjectCode() != project.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT);
+    }
+
+    private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion())
+                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+        if (processDefinitionLog.getProjectCode() != project.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
+        }
+
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE);
+    }
+
+    private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion())
+                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+        if (processDefinitionLog.getProjectCode() != project.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_CANCEL);
+
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
index 1805934628..05e708d3b0 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
@@ -2,11 +2,11 @@ package org.apache.dolphinscheduler.api.checker;
 
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProjectService;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -14,13 +14,12 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
 import static org.apache.dolphinscheduler.api.enums.Status.CLEAN_TASK_INSTANCE_ERROR_WORKFLOW_INSTANCE_IS_RUNNING;
 import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
 import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.PROJECT_NOT_EXIST;
 
 @Component
 public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
 
     @Autowired
-    private ProcessDefinitionDao processDefinitionDao;
+    private ProcessDefinitionLogDao processDefinitionLogDao;
 
     @Autowired
     private ProjectService projectService;
@@ -30,16 +29,13 @@ public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
         if (processInstance == null) {
             throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
         }
-        // todo: check permission
-        ProcessDefinition processDefinition =
-                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode());
-        if (processDefinition == null) {
-            throw new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getProcessDefinitionCode());
-        }
-        Project project = projectService.queryByCode(loginUser, processDefinition.getProjectCode());
-        if (project == null) {
-            throw new ServiceException(PROJECT_NOT_EXIST);
-        }
+        ProcessDefinitionLog processDefinitionLog =
+                processDefinitionLogDao
+                        .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                                processInstance.getProcessDefinitionVersion())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+        Project project = projectService.queryByCode(loginUser, processDefinitionLog.getProjectCode());
         projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CLEAN_TASK_INSTANCE_STATE);
 
         // check state
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
index fba7677ab7..c8cdd2cb58 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
@@ -96,6 +96,11 @@ public class ApiFuncIdentificationConstant {
     public static final String INSTANCE_BATCH_DELETE = "project:process-instance:batch-delete";
     public static final String FORCED_SUCCESS = "project:task-instance:force-success";
     public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state";
+    public static final String ISOLATION_TASK_LIST = "project:isolation-task:list";
+    public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online";
+    public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit";
+    public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel";
+    public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete";
     public static final String VIEW_LOG = "project:log:detail";
     public static final String DOWNLOAD_LOG = "project:log:download-log";
     public static final String PROJECT_OVERVIEW = "project:overview:view";
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
new file mode 100644
index 0000000000..f4b445456d
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
@@ -0,0 +1,96 @@
+package org.apache.dolphinscheduler.api.controller;
+
+import io.swagger.annotations.Api;
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.IsolationTaskService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
+
+import javax.validation.Valid;
+
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR;
+
+@Api(tags = "ISOLATION_TASK_TAG")
+@RestController
+@RequestMapping("/projects/{projectCode}/isolation-task")
+public class IsolationTaskController {
+
+    @Autowired
+    private IsolationTaskService isolationTaskService;
+
+    @PostMapping(value = "/submit")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> submitIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @RequestBody IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
+        isolationTaskService.submitTaskIsolations(loginUser, projectCode, isolationTaskSubmitRequest);
+        return Result.success(null);
+    }
+
+    @PutMapping(value = "/online/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @PathVariable(name = "id") long isolationId) {
+        isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId);
+        return Result.success(null);
+    }
+
+    @PutMapping(value = "/cancel/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> cancelIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @PathVariable(name = "id") long isolationId) {
+        isolationTaskService.cancelTaskIsolation(loginUser, projectCode, isolationId);
+        return Result.success(null);
+    }
+
+    @GetMapping("")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_LISTING_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<PageInfo<IsolationTask>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                @PathVariable long projectCode,
+                                                                @RequestBody @Valid IsolationTaskListingRequest request) {
+        return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request));
+    }
+
+    @DeleteMapping("/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_DELETE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @PathVariable long id) {
+        isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id);
+        return Result.success(null);
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
index c9afc8fabd..3fe81f7a71 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotEmpty;
 import java.util.List;
 
 @Data
@@ -11,5 +12,6 @@ import java.util.List;
 @AllArgsConstructor
 public class CleanTaskInstanceStateRequest {
 
+    @NotEmpty
     private List<Integer> taskInstanceIds;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
similarity index 60%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
index c9afc8fabd..a7e00677b5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
@@ -4,12 +4,14 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotEmpty;
 import java.util.List;
 
 @Data
-@NoArgsConstructor
 @AllArgsConstructor
-public class CleanTaskInstanceStateRequest {
+@NoArgsConstructor
+public class IsolationTaskCancelRequest {
 
-    private List<Integer> taskInstanceIds;
+    @NotEmpty
+    private List<Long> isolationIds;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
similarity index 60%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
index c9afc8fabd..82104c3c9d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
@@ -4,12 +4,14 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotEmpty;
 import java.util.List;
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
-public class CleanTaskInstanceStateRequest {
+public class IsolationTaskDeleteRequest {
 
-    private List<Integer> taskInstanceIds;
+    @NotEmpty
+    private List<Long> isolationIds;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java
new file mode 100644
index 0000000000..fecd65e9c6
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskListingRequest {
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    @NotNull(message = "page number cannot be null")
+    private Integer pageNo;
+
+    @NotNull(message = "page size cannot be null")
+    private Integer pageSize;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java
new file mode 100644
index 0000000000..d89b681eaf
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+
+import javax.validation.constraints.NotEmpty;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskSubmitRequest {
+
+    @NotEmpty
+    private List<IsolationTaskExcelParseVO> isolationTaskExcelParseVOList;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 37f89b406f..602cb477dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -513,7 +513,44 @@ public enum Status {
     CONFIG_FILTER_EMPTY(1400007, "complement time is empty after filtering according to the configuration",
             "当前补数时间根据配置过滤后为空"),
     PROJECT_NAME_TOO_LONG_ERROR(1400008, "project name is too long error", "项目名称过长"),
-    ;
+
+    ISOLATION_TASK_NOT_EXIST(1500000, "Isolation task not exist", "隔离任务不存在"),
+    ISOLATION_TASK_SUBMIT_ERROR(1500100, "Submit isolation task error", "提交隔离任务异常"),
+    ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT(1500101,
+            "Submit isolation task error, relate workflow instance [{0}] is not support",
+            "提交隔离任务异常, 关联的工作流实例:[{0}]暂不支持该操作"),
+    ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1500102,
+            "Submit isolation task error, relate workflow instance [{0}] is not belong to current project",
+            "提交隔离任务异常, 关联的工作流实例:[{0}]不属于当前项目"),
+    ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST(1500103, "Submit isolation task error, task: [{0}] is not exist",
+            "提交隔离任务异常, 任务不存在:[{0}]"),
+    ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK(1500104,
+            "Submit isolation task error, workflow instance: [{0}] exist an sub isolation task",
+            "提交隔离任务异常, 工作流实例已经存在:[{0}]子隔离任务"),
+    ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500105,
+            "Submit isolation task error, send request to master error",
+            "提交隔离任务异常,发送请求给Master异常"),
+
+    ISOLATION_TASK_ONLINE_ERROR(1500200, "Online isolation task error", "上线隔离任务异常"),
+    ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE(1500201,
+            "Online isolation task error, the isolation task is already online", "上线隔离任务异常,该任务已处于隔离中"),
+    ISOLATION_TASK_ONLINE_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500202,
+            "Online isolation task error, send request to master error", "上线隔离任务异常,发送隔离请求给Master异常"),
+    ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500203,
+            "Online isolation task error, the process instance is not belongs to project", "上线隔离任务异常,当前工作流实例不属于目标任务"),
+
+    ISOLATION_TASK_CANCEL_ERROR(1500300, "Cancel isolation task error", "取消隔离任务异常"),
+    ISOLATION_TASK_CANCEL_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500301,
+            "Cancel isolation task error, send request to master error", "取消隔离任务异常,发送隔离请求给Master异常"),
+    ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL(1500302,
+            "Cancel isolation task error, this task isolation is already been cancel", "取消隔离任务异常,该隔离已经下线"),
+    ISOLATION_TASK_CANCEL_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500303,
+            "Cancel isolation task error, the process instance is not belongs to project", "取消隔离任务异常,当前工作流实例不属于目标任务"),
+    ISOLATION_TASK_LISTING_ERROR(1500400, "Listing isolation task error", "查询隔离任务列表异常"),
+    ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"),
+    ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline",
+            "删除隔离任务异常,该隔离任务尚未下线"),
+            ;
 
     private final int code;
     private final String enMsg;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
new file mode 100644
index 0000000000..ef3634cd52
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
@@ -0,0 +1,35 @@
+package org.apache.dolphinscheduler.api.remote;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ApiServerRPCClient {
+
+    private final NettyRemotingClient client;
+
+    private static final long DEFAULT_TIME_OUT_MILLS = 1_000L;
+
+    public ApiServerRPCClient() {
+        client = new NettyRemotingClient(new NettyClientConfig());
+        log.info("Success initialized ApiServerRPCClient...");
+    }
+
+    public void sendSyncCommand(@NonNull Host host,
+                                @NonNull Command rpcCommand) throws RemotingException, InterruptedException {
+        sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS);
+    }
+
+    public void sendSyncCommand(@NonNull Host host,
+                                @NonNull Command rpcCommand,
+                                long timeoutMills) throws RemotingException, InterruptedException {
+        client.sendSync(host, rpcCommand, timeoutMills);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
new file mode 100644
index 0000000000..7225d19007
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
@@ -0,0 +1,32 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+public interface IsolationTaskService {
+
+    void submitTaskIsolations(@NonNull User loginUser,
+                              long projectCode,
+                              @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest);
+
+    void onlineTaskIsolation(@NonNull User loginUser,
+                             long projectCode,
+                             long isolationTaskId);
+
+    void cancelTaskIsolation(@NonNull User loginUser,
+                             long projectCode,
+                             long isolationId);
+
+    PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
+                                                 long projectCode,
+                                                 @NonNull IsolationTaskListingRequest request);
+
+    void deleteTaskIsolation(@NonNull User loginUser,
+                             long projectCode,
+                             long id);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 58baef26a6..f3a9069a72 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -18,16 +18,16 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 /**
  * process instance service
  */
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
index 716e2fba87..64fcf8fd99 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
@@ -44,6 +45,7 @@ public interface ProjectService {
      * @param projectCode project code
      * @return project detail information
      */
+    @NonNull
     Project queryByCode(User loginUser, long projectCode);
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java
new file mode 100644
index 0000000000..739c11a616
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+public interface WorkflowDAGService {
+
+    DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
+                                                           @NonNull Integer processDefinitionVersion);
+
+    DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
new file mode 100644
index 0000000000..4f6ff908b1
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
@@ -0,0 +1,246 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.api.checker.IsolationTaskChecker;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient;
+import org.apache.dolphinscheduler.api.service.IsolationTaskService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.transformer.CommandTransformer;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR;
+
+@Slf4j
+@Service
+public class IsolationTaskServiceImpl implements IsolationTaskService {
+
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @Autowired
+    private IsolationTaskChecker isolationTaskChecker;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private CommandTransformer commandTransformer;
+
+    @Autowired
+    private ApiServerRPCClient apiServerRPCClient;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private WorkflowDAGService workflowDAGService;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Override
+    @Transactional
+    public void submitTaskIsolations(@NonNull User loginUser,
+                                     long projectCode,
+                                     @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
+        Map<Integer, List<IsolationTaskExcelParseVO>> workflow2VoMap =
+                isolationTaskSubmitRequest.getIsolationTaskExcelParseVOList()
+                        .stream().collect(
+                                HashMap::new, (map, vo) -> {
+                                    map.computeIfAbsent(vo.getWorkflowInstanceId(), k -> new ArrayList<>()).add(vo);
+                                },
+                                Map::putAll);
+
+        List<Pair<ProcessInstance, List<IsolationTask>>> needToOnlineIsolations = new ArrayList<>();
+        List<IsolationTask> needToInsertIntoDB = new ArrayList<>();
+        for (Map.Entry<Integer, List<IsolationTaskExcelParseVO>> entry : workflow2VoMap.entrySet()) {
+            Integer workflowInstanceId = entry.getKey();
+            List<IsolationTaskExcelParseVO> vos = entry.getValue();
+            ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+            isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos);
+
+            List<IsolationTask> isolationTasks = entry.getValue().stream().map(vo -> {
+                return IsolationTask.builder()
+                        .workflowInstanceId(vo.getWorkflowInstanceId())
+                        .workflowInstanceName(vo.getWorkflowInstanceName())
+                        .taskName(vo.getTaskName())
+                        .taskCode(vo.getTaskCode())
+                        .status(IsolationTaskStatus.ONLINE.getCode())
+                        .build();
+            }).collect(Collectors.toList());
+            needToInsertIntoDB.addAll(isolationTasks);
+            needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks));
+        }
+        isolationTaskDao.batchInsert(needToInsertIntoDB);
+        // we split here to avoid rollback RPC request
+        try {
+            refreshIsolationTasks();
+        } catch (Exception ex) {
+            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR);
+        }
+    }
+
+    @Override
+    @Transactional
+    public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) {
+        IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId)
+                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+        ProcessInstance processInstance =
+                processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId());
+
+        isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
+        isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE);
+        try {
+            refreshIsolationTasks();
+        } catch (Exception ex) {
+            throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR);
+        }
+    }
+
+    @Override
+    @Transactional
+    public void cancelTaskIsolation(@NonNull User loginUser,
+                                    long projectCode,
+                                    long isolationId) {
+        IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
+                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+
+        Integer workflowInstanceId = isolationTask.getWorkflowInstanceId();
+        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+        isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
+        isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE);
+        insertRecoveryCommandIfNeeded(processInstance, isolationTask);
+        try {
+            refreshIsolationTasks();
+        } catch (RemotingException | InterruptedException e) {
+            throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR);
+        }
+    }
+
+    @Override
+    public PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
+                                                        long projectCode,
+                                                        @NonNull IsolationTaskListingRequest request) {
+        isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode);
+
+        Integer pageNo = request.getPageNo();
+        Integer pageSize = request.getPageSize();
+
+        IPage<IsolationTask> iPage = isolationTaskDao.pageQueryIsolationTask(
+                request.getWorkflowInstanceName(),
+                request.getTaskName(),
+                pageNo,
+                pageSize);
+
+        PageInfo<IsolationTask> pageInfo = new PageInfo<>(pageNo, pageSize);
+        pageInfo.setTotal((int) iPage.getTotal());
+        pageInfo.setTotalList(iPage.getRecords());
+        return pageInfo;
+    }
+
+    @Override
+    public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) {
+        isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id);
+        int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE);
+        if (deleteNum <= 0) {
+            throw new ServiceException(ISOLATION_TASK_NOT_EXIST);
+        }
+    }
+
+    private void refreshIsolationTasks() throws RemotingException, InterruptedException {
+        List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+        if (CollectionUtils.isEmpty(masters)) {
+            return;
+        }
+
+        org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest =
+                new RefreshIsolationTaskRequest().convert2Command();
+        for (Server master : masters) {
+            try {
+                apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+                        refreshIsolationRequest);
+            } catch (RemotingException | InterruptedException e) {
+                log.error("Send RefreshIsolationTask request to master error, master: {}", master, e);
+                throw e;
+            }
+        }
+    }
+
+    private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance,
+                                               @NonNull IsolationTask isolationTask) {
+        if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) {
+            return;
+        }
+        int workflowInstanceId = processInstance.getId();
+        // find the isolationTaskInstanceIds need to recovery
+        // find the sub node is in pause or kill
+        DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(
+                processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
+
+        List<TaskInstance> taskInstances =
+                taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId);
+        Set<String> onlineIsolationTaskCodes =
+                isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE)
+                        .stream()
+                        .map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode()))
+                        .collect(Collectors.toSet());
+
+        List<TaskInstance> canRecoveryTaskInstances = taskInstances.stream()
+                .filter(taskInstance -> taskInstance.getState().typeIsIsolated())
+                .filter(taskInstance -> !DagHelper.isChildOfAnyParentNodes(String.valueOf(taskInstance.getTaskCode()),
+                        onlineIsolationTaskCodes, workflowDAG))
+                .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) {
+            return;
+        }
+        // find if this taskInstance still exist pre isolationTasks
+        Command command = commandTransformer.transformToRecoveryFromTaskIsolationCommand(processInstance,
+                canRecoveryTaskInstances);
+        processService.createCommand(command);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7f5bdf086b..3720956c45 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
@@ -168,7 +169,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
      * @return project detail information
      */
     @Override
-    public Project queryByCode(User loginUser, long projectCode) {
+    public @NonNull Project queryByCode(User loginUser, long projectCode) {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByCode(projectCode);
         hasProjectAndPerm(loginUser, project, result, PROJECT);
@@ -204,10 +205,8 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
                 perm)) {
             // check read permission
             Project checkProject = projectMapper.queryByCode(projectCode);
-            putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
+            throw new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
                     Objects.nonNull(checkProject) ? project.getName() : projectCode);
-        } else {
-            putMsg(result, Status.SUCCESS);
         }
     }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index f6da23ac0f..9d1dc166a2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
@@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -41,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -89,6 +92,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private WorkflowInstanceChecker workflowInstanceChecker;
+
     @Autowired
     private CommandTransformer commandTransformer;
 
@@ -219,7 +228,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
             List<TaskInstance> existTaskInstances = taskInstanceDao.queryTaskInstanceByIds(taskInstanceIds);
             TaskInstanceValidator.validateTaskInstanceAllExists(taskInstanceIds, existTaskInstances);
 
-            Map<Integer, List<Integer>> processInstanceId2TaskInstanceIds =
+            Map<Integer, List<Integer>> processInstanceId2TaskInstanceIdMap =
                     existTaskInstances.stream().collect(
                             HashMap::new,
                             (map, taskInstance) -> {
@@ -228,11 +237,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
                             },
                             Map::putAll);
 
-            List<Command> cleanStateCommands = commandTransformer.transformToCleanTaskInstanceStateCommands(loginUser,
-                    processInstanceId2TaskInstanceIds);
-
             // todo: use batch create and remove the transactional
-            cleanStateCommands.forEach(command -> processService.createCommand(command));
+            for (Map.Entry<Integer, List<Integer>> processInstanceId2TaskInstanceIds : processInstanceId2TaskInstanceIdMap
+                    .entrySet()) {
+                Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey();
+                List<Integer> needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue();
+                ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+                workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance);
+                Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance,
+                        needToCleanStateTaskInstanceIds);
+                processService.createCommand(command);
+            }
         } catch (ServiceException serviceException) {
             throw serviceException;
         } catch (Exception ex) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java
new file mode 100644
index 0000000000..f1ffb0f64d
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java
@@ -0,0 +1,64 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+
+@Service
+public class WorkflowDagServiceImpl implements WorkflowDAGService {
+
+    @Autowired
+    private ProcessDefinitionLogDao processDefinitionLogDao;
+
+    // todo: use dao
+    @Autowired
+    private ProcessService processService;
+
+    @Override
+    public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
+                                                                  @NonNull Integer processDefinitionVersion) {
+        ProcessDefinitionLog processDefinitionLog =
+                processDefinitionLogDao.queryProcessDefinitionByCode(processDefinitionCode, processDefinitionVersion)
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processDefinitionCode));
+        return getWorkflowDAG(processDefinitionLog);
+    }
+
+    @Override
+    public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog) {
+        List<ProcessTaskRelation> processTaskRelations =
+                processService.findRelationByCode(processDefinitionLog.getCode(), processDefinitionLog.getVersion());
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                processService.getTaskDefineLogListByRelation(processTaskRelations);
+        List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
+
+        ProcessDag processDag = DagHelper.generateFlowDag(
+                taskNodeList,
+                Collections.emptyList(),
+                Collections.emptyList(),
+                TaskDependType.TASK_POST);
+        if (processDag == null) {
+            throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
+        }
+
+        return DagHelper.buildDagGraph(processDag);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
index 51865effaf..986c2b99a6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
@@ -2,17 +2,16 @@ package org.apache.dolphinscheduler.api.transformer;
 
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
 import java.util.List;
-import java.util.Map;
 
 public interface CommandTransformer {
 
-    List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
-                                                            @NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds);
+    Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
+                                                     @NonNull List<Integer> needCleanTaskInstanceIds);
 
-    Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
-                                                     @NonNull Integer workflowInstanceId,
-                                                     @NonNull List<Integer> needCleanTaskInstances);
+    Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
+                                                        @NonNull List<TaskInstance> canRecoveryTaskInstances);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
index 2a0e46033e..9d8aadff43 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
@@ -1,53 +1,59 @@
 package org.apache.dolphinscheduler.api.transformer;
 
 import lombok.NonNull;
-import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS;
 
 @Component
 public class CommandTransformerImpl implements CommandTransformer {
 
-    @Autowired
-    private ProcessInstanceDao processInstanceDao;
+    @Override
+    public Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
+                                                            @NonNull List<Integer> needCleanTaskInstanceIds) {
 
-    @Autowired
-    private WorkflowInstanceChecker workflowInstanceChecker;
+        Command command = new Command();
+        command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
+        command.setExecutorId(processInstance.getExecutorId());
+        command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+        command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+        // we set the task post strategy, since we will go through the post task if the post task is not running
+        command.setTaskDependType(TaskDependType.TASK_POST);
+        command.setFailureStrategy(processInstance.getFailureStrategy());
+        command.setWarningType(processInstance.getWarningType());
+        command.setWarningGroupId(processInstance.getWarningGroupId());
+        command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+        command.setWorkerGroup(processInstance.getWorkerGroup());
+        command.setEnvironmentCode(processInstance.getEnvironmentCode());
+        command.setDryRun(processInstance.getDryRun());
 
-    @Override
-    public List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
-                                                                   @NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds) {
-        List<Command> commands = new ArrayList<>(workflowInstanceId2TaskInstanceIds.size());
-        workflowInstanceId2TaskInstanceIds.forEach((workflowInstanceId, taskInstanceIds) -> {
-            Command command = transformToCleanTaskInstanceStateCommand(loginUser, workflowInstanceId, taskInstanceIds);
-            commands.add(command);
-        });
-        return commands;
+        // todo:use pojo to represent CommandParam rather than map
+        Map<String, String> commandParamMap = new HashMap<>();
+        commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstanceIds));
+        command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
+        command.setProcessInstanceId(processInstance.getId());
+        return command;
     }
 
     @Override
-    public Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
-                                                            @NonNull Integer workflowInstanceId,
-                                                            @NonNull List<Integer> needCleanTaskInstances) {
-        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
-        workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, processInstance);
-
+    public Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
+                                                               @NonNull List<TaskInstance> canRecoveryIsolationTaskInstances) {
         Command command = new Command();
-        command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
+        command.setCommandType(CommandType.RECOVERY_FROM_ISOLATION_TASKS);
+
         command.setExecutorId(processInstance.getExecutorId());
         command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
         command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
@@ -62,10 +68,22 @@ public class CommandTransformerImpl implements CommandTransformer {
         command.setDryRun(processInstance.getDryRun());
 
         // todo:use pojo to represent CommandParam rather than map
+        List<Integer> recoveryPausedIsolationIds = canRecoveryIsolationTaskInstances.stream()
+                .filter(taskInstance -> taskInstance.getState().typeIsPauseByIsolation())
+                .map(TaskInstance::getId)
+                .collect(Collectors.toList());
+        List<Integer> recoveryKilledIsolationIds = canRecoveryIsolationTaskInstances.stream()
+                .filter(taskInstance -> taskInstance.getState().typeIsKilledByIsolation())
+                .map(TaskInstance::getId)
+                .collect(Collectors.toList());
         Map<String, String> commandParamMap = new HashMap<>();
-        commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstances));
+        commandParamMap.put(CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS,
+                JSONUtils.toJsonString(recoveryPausedIsolationIds));
+        commandParamMap.put(CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS,
+                JSONUtils.toJsonString(recoveryKilledIsolationIds));
         command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
         command.setProcessInstanceId(processInstance.getId());
         return command;
     }
+
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java
new file mode 100644
index 0000000000..be63400aed
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskExcelParseVO {
+
+    @NotNull(message = "Workflow instance id cannot be null")
+    private Integer workflowInstanceId;
+
+    @NotNull(message = "Workflow instance name cannot be null")
+    private String workflowInstanceName;
+
+    @NotNull(message = "Task code cannot be null")
+    private Long taskCode;
+
+    @NotNull(message = "Task name cannot be null")
+    private String taskName;
+}
diff --git a/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx
new file mode 100644
index 0000000000..f334cc6e5e
Binary files /dev/null and b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx differ
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 0499d43c30..efffbc3842 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -338,6 +338,9 @@ public final class Constants {
 
     public static final String CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS = "CleanStateTaskInstanceIds";
 
+    public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds";
+    public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds";
+
     public static final String CMD_PARAM_START_PARAMS = "StartParams";
 
     public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
@@ -859,5 +862,5 @@ public final class Constants {
      * file upload verify
      */
     public static final String FILE_TYPE_RESTRICTED_LIST = "file.type.restricted.list";
-    public static final String FILE_NAME_RESTRICTED_CONTENT= "file.name.restricted.content";
+    public static final String FILE_NAME_RESTRICTED_CONTENT = "file.name.restricted.content";
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index f9bc11fee3..62d538a58b 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -55,6 +55,8 @@ public enum CommandType {
     RECOVER_SERIAL_WAIT(11, "recover serial wait"),
     START_FROM_STATE_CLEAN_TASKS(12, "start from state clean tasks"),
 
+    RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"),
+
     ;
 
     CommandType(int code, String descp) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
new file mode 100644
index 0000000000..dcc93cc64b
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+public enum IsolationTaskStatus {
+
+    ONLINE(0),
+    OFFLINE(1),
+    ;
+
+    private final int code;
+
+    IsolationTaskStatus(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public static IsolationTaskStatus of(int code) {
+        for (IsolationTaskStatus value : IsolationTaskStatus.values()) {
+            if (value.getCode() == code) {
+                return value;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code));
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
new file mode 100644
index 0000000000..8a852aace5
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
@@ -0,0 +1,37 @@
+package org.apache.dolphinscheduler.dao.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("t_ds_isolation_task")
+public class IsolationTask {
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Long id;
+
+    private int workflowInstanceId;
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    private long taskCode;
+
+    private int status;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
new file mode 100644
index 0000000000..babb00c5bb
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -0,0 +1,32 @@
+package org.apache.dolphinscheduler.dao.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
+
+    IPage<IsolationTask> pageQuery(@Param("workflowInstanceName") String workflowInstanceName,
+                                   @Param("taskName") String taskName,
+                                   IPage<IsolationTask> page);
+
+    List<IsolationTask> queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId,
+                                         @NonNull @Param("taskCodes") List<Long> taskCodes);
+
+    void updateIsolationTaskStatus(@Param("id") long isolationTaskId,
+                                   @Param("status") int status);
+
+    int deleteByIdAndStatus(@Param("id") long isolationTaskId,
+                            @Param("status") int status);
+
+    List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId,
+                                                  @Param("status") int status);
+
+    void batchInsert(@Param("isolationTasks") List<IsolationTask> isolationTasks);
+
+    List<IsolationTask> queryByStatus(@Param("status") int code);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
new file mode 100644
index 0000000000..de16a92be0
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -0,0 +1,39 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public interface IsolationTaskDao {
+
+    /**
+     * Page query, pageNumber is start from 0.
+     */
+    IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
+                                                String taskName,
+                                                int pageNumber,
+                                                int pageSize);
+
+    List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes);
+
+    List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus);
+
+    Optional<IsolationTask> queryById(long isolationTaskId);
+
+    List<IsolationTask> queryByIds(List<Long> isolationTaskIds);
+
+    List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus);
+
+    int deleteByIdAndStatus(long id, IsolationTaskStatus status);
+
+    void insert(IsolationTask isolationTaskDTO);
+
+    void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus);
+
+    void batchInsert(List<IsolationTask> isolationTasks);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
new file mode 100644
index 0000000000..a8eba87989
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+import java.util.Optional;
+
+public interface ProcessDefinitionLogDao {
+
+    Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
+                                                                @NonNull Integer processDefinitionVersion);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index f218510191..e789101783 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -38,4 +38,5 @@ public interface ProcessInstanceDao {
      * @param processInstance processInstance
      */
     int upsertProcessInstance(ProcessInstance processInstance);
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
new file mode 100644
index 0000000000..6a606e3aef
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
@@ -0,0 +1,11 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+
+import java.util.List;
+
+public interface ProcessTaskRelationLogDao {
+
+    List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(long workflowDefinitionCode,
+                                                                       int workflowDefinitionVersion);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
new file mode 100644
index 0000000000..5c8d55b005
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+
+import java.util.List;
+
+public interface TaskDefinitionLogDao {
+
+    List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index c5b75fb0d9..3918ad8386 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -10,6 +10,8 @@ public interface TaskInstanceDao {
 
     List<TaskInstance> queryTaskInstanceByIds(List<Integer> taskInstanceId);
 
+    List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId);
+
     /**
      * Update the taskInstance, if update failed will throw exception.
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
new file mode 100644
index 0000000000..effab3aa16
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -0,0 +1,86 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+@Slf4j
+@Repository
+public class IsolationTaskDaoImpl implements IsolationTaskDao {
+
+    @Autowired
+    private IsolationTaskMapper isolationTaskMapper;
+
+    @Override
+    public IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
+                                                       String taskName,
+                                                       int pageNumber,
+                                                       int pageSize) {
+        Page<IsolationTask> page = new Page<>(pageNumber, pageSize);
+        return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page);
+    }
+
+    @Override
+    public List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes) {
+        if (CollectionUtils.isEmpty(taskCodes)) {
+            return Collections.emptyList();
+        }
+        return isolationTaskMapper.queryByTaskCodes(workflowInstanceId, taskCodes);
+    }
+
+    @Override
+    public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId,
+                                                         IsolationTaskStatus isolationTaskStatus) {
+        return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode());
+    }
+
+    @Override
+    public Optional<IsolationTask> queryById(long isolationTaskId) {
+        return Optional.ofNullable(isolationTaskMapper.selectById(isolationTaskId));
+    }
+
+    @Override
+    public List<IsolationTask> queryByIds(List<Long> isolationTaskIds) {
+        return isolationTaskMapper.selectBatchIds(isolationTaskIds);
+    }
+
+    @Override
+    public List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) {
+        return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode());
+    }
+
+    @Override
+    public int deleteByIdAndStatus(long id, IsolationTaskStatus status) {
+        return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode());
+    }
+
+    @Override
+    public void insert(IsolationTask isolationTask) {
+        isolationTaskMapper.insert(isolationTask);
+    }
+
+    @Override
+    public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) {
+        isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode());
+    }
+
+    @Override
+    public void batchInsert(List<IsolationTask> isolationTasks) {
+        if (CollectionUtils.isEmpty(isolationTasks)) {
+            return;
+        }
+        isolationTaskMapper.batchInsert(isolationTasks);
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..1c77aea33f
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+@Repository
+public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
+
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
+    @Override
+    public Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
+                                                                       @NonNull Integer processDefinitionVersion) {
+        return Optional.ofNullable(
+                processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode,
+                        processDefinitionVersion));
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
new file mode 100644
index 0000000000..9cdf6c7de6
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao {
+
+    @Autowired
+    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+    @Override
+    public List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(
+                                                                              long workflowDefinitionCode,
+                                                                              int workflowDefinitionVersion) {
+        return processTaskRelationLogMapper.queryByProcessCodeAndVersion(workflowDefinitionCode,
+                workflowDefinitionVersion);
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..205fc3b649
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
@@ -0,0 +1,44 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Repository
+public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Override
+    public List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations) {
+        List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+        if (CollectionUtils.isEmpty(processTaskRelations)) {
+            return taskDefinitionLogs;
+        }
+        Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+            if (processTaskRelation.getPreTaskCode() > 0) {
+                taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
+            }
+            if (processTaskRelation.getPostTaskCode() > 0) {
+                taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
+            }
+        }
+
+        taskCodeVersionMap.forEach((code, version) -> {
+            taskDefinitionLogs.add(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version));
+        });
+        return taskDefinitionLogs;
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 510b5d7a71..0512980b10 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository.impl;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.exception.RepositoryException;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@@ -28,6 +29,11 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
         return taskInstanceMapper.selectBatchIds(taskInstanceIds);
     }
 
+    @Override
+    public List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) {
+        return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES);
+    }
+
     @Override
     public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException {
         try {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index 3f25bb0f49..3ab01171e1 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -17,7 +17,9 @@
 
 package org.apache.dolphinscheduler.dao.utils;
 
+import com.google.common.collect.Sets;
 import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -30,9 +32,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.commons.collections.CollectionUtils;
-
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -41,12 +45,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
 /**
  * dag tools
  */
@@ -591,4 +589,89 @@ public class DagHelper {
         }
         return false;
     }
+
+    /**
+     * Get all post nodes of the given node.
+     * <p>
+     * e.g. the given DAG is A -> B -> C, and taskNode is A, return result is A,B,C
+     */
+    public static Set<String> getAllPostNodes(@NonNull String taskNode,
+                                              @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        Set<String> result = Sets.newHashSet();
+        Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
+        Set<String> visitedNodes = new HashSet<>();
+        while (!currentLoopNodes.isEmpty()) {
+            Set<String> tmp = new HashSet<>();
+            for (String currentNode : currentLoopNodes) {
+                result.add(currentNode);
+                Set<String> subsequentNodes = workflowDAG.getSubsequentNodes(currentNode);
+                visitedNodes.add(currentNode);
+                subsequentNodes.forEach(s -> {
+                    if (!visitedNodes.contains(s)) {
+                        tmp.add(s);
+                    }
+                });
+            }
+            currentLoopNodes = tmp;
+        }
+        return result;
+    }
+
+    /**
+     * Get all pre nodes of the given node.
+     * <p>
+     * e.g. the given DAG is A -> B -> C, and taskNode is B, return result is A,B
+     */
+    public static Set<String> getAllPreNodes(@NonNull String taskNode,
+                                             @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        Set<String> result = Sets.newHashSet();
+        Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
+        Set<String> visitedNodes = new HashSet<>();
+        while (CollectionUtils.isNotEmpty(currentLoopNodes)) {
+            Set<String> tmp = new HashSet<>();
+            for (String currentNode : currentLoopNodes) {
+                result.add(currentNode);
+                visitedNodes.add(currentNode);
+                workflowDAG.getPreviousNodes(currentNode).forEach(s -> {
+                    if (!visitedNodes.contains(s)) {
+                        tmp.add(s);
+                    }
+                });
+            }
+            currentLoopNodes = tmp;
+        }
+        return result;
+    }
+
+    /**
+     * Judge if the givenTaskNode is the child of any given parentNodes or is belongs to parentNodes.
+     */
+    public static boolean isChildOfAnyParentNodes(@NonNull String givenTaskNode,
+                                                  @NonNull Set<String> parentNodes,
+                                                  @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        if (CollectionUtils.isEmpty(parentNodes)) {
+            return false;
+        }
+
+        if (parentNodes.contains(givenTaskNode)) {
+            return true;
+        }
+
+        Set<String> postNodesOfParent = new HashSet<>();
+        Set<String> visited = new HashSet<>();
+        while (CollectionUtils.isNotEmpty(parentNodes)) {
+            Set<String> tmp = new HashSet<>();
+            for (String parentNode : parentNodes) {
+                postNodesOfParent.add(parentNode);
+                visited.add(parentNode);
+                workflowDAG.getSubsequentNodes(parentNode).forEach(node -> {
+                    if (!visited.contains(node)) {
+                        tmp.add(node);
+                    }
+                });
+            }
+            parentNodes = tmp;
+        }
+        return postNodesOfParent.contains(givenTaskNode);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
new file mode 100644
index 0000000000..4a7100010e
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper">
+    <sql id="baseSql">
+        id
+        ,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time
+    </sql>
+
+    <select id="pageQuery" resultType="string">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where 1 = 1
+        <if test="workflowInstanceName != null">
+            and workflow_instance_name = #{workflowInstanceName}
+        </if>
+        <if test="taskName != null">
+            and task_name = #{taskName}
+        </if>
+    </select>
+
+    <select id="queryByTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where workflow_instance_id = #{workflowInstanceId}
+        and task_code in
+        <foreach collection="taskCodes" item="taskCode" open="(" separator="," close=")">
+            #{taskCode}
+        </foreach>
+    </select>
+
+    <select id="queryByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where workflow_instance_id = #{workflowInstanceId} and status = #{status}
+    </select>
+
+    <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where  status = #{status}
+    </select>
+
+    <update id="updateIsolationTaskStatus">
+        update t_ds_isolation_task
+        set status = #{status}
+        where id = #{id}
+    </update>
+
+    <insert id="batchInsert">
+        insert into t_ds_isolation_task (
+        workflow_instance_id,
+        workflow_instance_name,
+        task_name,
+        task_code,
+        status
+        )
+        values
+        <foreach collection="isolationTasks" item="isolationTask" separator=",">
+            (
+            #{isolationTask.workflowInstanceId},
+            #{isolationTask.workflowInstanceName},
+            #{isolationTask.taskName},
+            #{isolationTask.taskCode},
+            #{isolationTask.status}
+            )
+        </foreach>
+    </insert>
+
+    <delete id="deleteByIdAndStatus">
+        delete
+        from t_ds_isolation_task
+        where id = #{id}
+          and status = #{status}
+    </delete>
+
+</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 2422318c10..8f6df6738c 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -1977,3 +1977,20 @@ CREATE TABLE t_ds_fav
     user_id   int         NOT NULL,
     PRIMARY KEY (id)
 );
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+    `id`                     bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`   bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
+    `status`                 tinyint(4) DEFAULT '0',
+    `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`)
+);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 6df6a9b0fe..e5b69db827 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -1896,22 +1896,22 @@ CREATE TABLE `t_ds_k8s` (
 -- Table structure for t_ds_k8s_namespace
 -- ----------------------------
 DROP TABLE IF EXISTS `t_ds_k8s_namespace`;
-CREATE TABLE `t_ds_k8s_namespace` (
-  `id` int(11) NOT NULL AUTO_INCREMENT,
-  `limits_memory` int(11) DEFAULT NULL,
-  `namespace` varchar(100) DEFAULT NULL,
-  `online_job_num` int(11) DEFAULT NULL,
-  `owner` varchar(100) DEFAULT NULL,
-  `pod_replicas` int(11) DEFAULT NULL,
-  `pod_request_cpu` decimal(14,3) DEFAULT NULL,
-  `pod_request_memory` int(11) DEFAULT NULL,
-  `tag` varchar(100) DEFAULT NULL,
-  `limits_cpu` decimal(14,3) DEFAULT NULL,
-  `k8s` varchar(100) DEFAULT NULL,
-  `create_time` datetime DEFAULT NULL COMMENT 'create time',
-  `update_time` datetime DEFAULT NULL COMMENT 'update time',
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
+CREATE TABLE `t_ds_k8s_namespace`(
+                                     `id`                 int(11) NOT NULL AUTO_INCREMENT,
+                                     `limits_memory`      int(11) DEFAULT NULL,
+                                     `namespace`          varchar(100)   DEFAULT NULL,
+                                     `online_job_num`     int(11) DEFAULT NULL,
+                                     `owner`              varchar(100)   DEFAULT NULL,
+                                     `pod_replicas`       int(11) DEFAULT NULL,
+                                     `pod_request_cpu`    decimal(14, 3) DEFAULT NULL,
+                                     `pod_request_memory` int(11) DEFAULT NULL,
+                                     `tag`                varchar(100)   DEFAULT NULL,
+                                     `limits_cpu`         decimal(14, 3) DEFAULT NULL,
+                                     `k8s`                varchar(100)   DEFAULT NULL,
+                                     `create_time`        datetime       DEFAULT NULL COMMENT 'create time',
+                                     `update_time`        datetime       DEFAULT NULL COMMENT 'update time',
+                                     PRIMARY KEY (`id`),
+                                     UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
 ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
 
 
@@ -1919,21 +1919,22 @@ CREATE TABLE `t_ds_k8s_namespace` (
 -- Table structure for t_ds_alert_send_status
 -- ----------------------------
 DROP TABLE IF EXISTS t_ds_alert_send_status;
-CREATE TABLE t_ds_alert_send_status(
-   `id`                       int(11) NOT NULL AUTO_INCREMENT,
-   `alert_id`                 int(11) NOT NULL,
-   `alert_plugin_instance_id` int(11) NOT NULL,
-   `send_status`              tinyint(4) DEFAULT '0',
-   `log`                      text,
-   `create_time`              datetime DEFAULT NULL COMMENT 'create time',
-   PRIMARY KEY (`id`),
-   UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
-) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
+DROP TABLE IF EXISTS `t_ds_fav_task`;
 
 -- ----------------------------
 -- Table structure for t_ds_fav_task
 -- ----------------------------
-DROP TABLE IF EXISTS `t_ds_fav_task`;
+CREATE TABLE t_ds_alert_send_status
+(
+    `id`                       int(11) NOT NULL AUTO_INCREMENT,
+    `alert_id`                 int(11) NOT NULL,
+    `alert_plugin_instance_id` int(11) NOT NULL,
+    `send_status`              tinyint(4) DEFAULT '0',
+    `log`                      text,
+    `create_time`              datetime DEFAULT NULL COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 CREATE TABLE `t_ds_fav_task`
 (
     `id`        bigint      NOT NULL AUTO_INCREMENT COMMENT 'favorite task id',
@@ -1943,3 +1944,21 @@ CREATE TABLE `t_ds_fav_task`
 ) ENGINE = InnoDB
   AUTO_INCREMENT = 1
   DEFAULT CHARSET = utf8;
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+    `id`                     bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`   bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
+    `status`                 tinyint(4) DEFAULT '0',
+    `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`),
+    KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE
+) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 15bf31236a..f9bb5587cc 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -1941,3 +1941,21 @@ CREATE TABLE t_ds_fav
     user_id   int         NOT NULL,
     PRIMARY KEY (id)
 );
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+    `id`                     bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`   bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
+    `status`                 tinyint(4) DEFAULT '0',
+    `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`),
+    CONSTRAINT               `workflow_instance_index` (`workflow_instance_id`)
+);
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index a80660b549..de1890f150 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.dao.utils;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Sets;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
+import org.junit.Assert;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,10 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
 
 /**
  * dag helper test
@@ -335,9 +334,8 @@ public class DagHelperTest {
      * 2->8->5->7
      *
      * @return dag
-     * @throws JsonProcessingException if error throws JsonProcessingException
      */
-    private DAG<String, TaskNode, TaskNodeRelation> generateDag() throws IOException {
+    private DAG<String, TaskNode, TaskNodeRelation> generateDag() {
         List<TaskNode> taskNodeList = new ArrayList<>();
         TaskNode node1 = new TaskNode();
         node1.setId("1");
@@ -529,4 +527,23 @@ public class DagHelperTest {
         Assert.assertNotNull(dag);
     }
 
+    @Test
+    public void getAllPostNodes() {
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+        Set<String> allPostNodes = DagHelper.getAllPostNodes("2", dag);
+        Assert.assertEquals(Sets.newHashSet("2", "3", "5", "6", "7", "8"), allPostNodes);
+    }
+
+    @Test
+    public void getAllPreNodes() {
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+        Set<String> allPostNodes = DagHelper.getAllPreNodes("2", dag);
+        Assert.assertEquals(Sets.newHashSet("1", "2"), allPostNodes);
+    }
+
+    @Test
+    public void isChildOfAnyParentNodes() {
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+        Assert.assertTrue(DagHelper.isChildOfAnyParentNodes("2", Sets.newHashSet("1"), dag));
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
new file mode 100644
index 0000000000..826a456fd8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
@@ -0,0 +1,26 @@
+package org.apache.dolphinscheduler.server.master.processor.isolation;
+
+import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+
+    @Autowired
+    private IsolationTaskManager isolationTaskManager;
+
+    @Override
+    public void process(Channel channel, Command command) {
+        if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
+            throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
+        }
+        isolationTaskManager.refreshIsolationTaskMapFromDB();
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index c2eb200059..eb4d82cd3d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.rpc;
 
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingData
 
 import javax.annotation.PostConstruct;
 
+import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -74,6 +76,9 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
 
+    @Autowired
+    private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor;
+
     public void start() {
         logger.info("Starting Master RPC Server...");
         // init remoting server
@@ -96,6 +101,9 @@ public class MasterRPCServer implements AutoCloseable {
         this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
+        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST,
+                refreshIsolationTaskProcessor);
+
         this.nettyRemotingServer.start();
         logger.info("Started Master RPC Server...");
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 5546b474d7..6446248c06 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
 import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
     @Autowired
     private MasterFailoverService masterFailoverService;
 
+    @Autowired
+    private IsolationTaskManager isolationTaskManager;
+
     protected FailoverExecuteThread() {
         super("FailoverExecuteThread");
     }
@@ -66,6 +70,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
                 // todo: DO we need to schedule a task to do this kind of check
                 // This kind of check may only need to be executed when a master server start
                 masterFailoverService.checkMasterFailover();
+                isolationTaskManager.refreshIsolationTaskMapFromDB();
             } catch (Exception e) {
                 logger.error("Master failover thread execute error", e);
             } finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 24982437a2..1d823a71a7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -108,6 +109,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
     @Autowired
     private StateEventCallbackService stateEventCallbackService;
 
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
     private String masterAddress;
 
     protected MasterSchedulerBootstrap() {
@@ -184,7 +188,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                                 processAlertManager,
                                 masterConfig,
                                 stateWheelExecuteThread,
-                                curingGlobalParamsService);
+                                curingGlobalParamsService,
+                                isolationTaskDao);
                         processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
                         workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
                                 processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index d05bdf8015..e102238178 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -37,8 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.Environment;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -47,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -188,6 +191,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final Set<String> stateCleanTaskCodes = new HashSet<>();
 
+    // isolationTaskCode -> isolatedTimes
+    private final Map<Long, Integer> isolationTaskCodesToTimesMap = new HashMap<>();
+
     /**
      * state event queue
      */
@@ -208,6 +214,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final CuringParamsService curingParamsService;
 
+    private final IsolationTaskDao isolationTaskDao;
+
     private final String masterAddress;
 
     /**
@@ -227,7 +235,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                                    @NonNull ProcessAlertManager processAlertManager,
                                    @NonNull MasterConfig masterConfig,
                                    @NonNull StateWheelExecuteThread stateWheelExecuteThread,
-                                   @NonNull CuringParamsService curingParamsService) {
+                                   @NonNull CuringParamsService curingParamsService,
+                                   @NonNull IsolationTaskDao isolationTaskDao) {
         this.processService = processService;
         this.processInstanceDao = processInstanceDao;
         this.processInstance = processInstance;
@@ -235,6 +244,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
         this.curingParamsService = curingParamsService;
+        this.isolationTaskDao = isolationTaskDao;
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
         TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
     }
@@ -435,6 +445,84 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
+    public void onlineTaskIsolation(long taskCode) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            // find the post running task
+            Set<String> needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+            // if the current task is finished, kill the post task
+            // we need to submit an online isolation task event, otherwise there may exist concurrent problem
+            for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) {
+                Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr);
+                isolationTaskCodesToTimesMap.put(isolationTaskCode,
+                        isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1);
+                ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
+                if (iTaskProcessor == null) {
+                    logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
+                            isolationTaskCode);
+                    continue;
+                }
+                iTaskProcessor.action(TaskAction.ISOLATE);
+                StateEvent stateEvent = new StateEvent();
+                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+                stateEvent.setProcessInstanceId(this.processInstance.getId());
+                stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
+                stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
+                this.addStateEvent(stateEvent);
+            }
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
+        }
+    }
+
+    public void cancelTaskIsolation(long taskCode) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            // todo:
+            // restart the killed/ paused task
+            Set<String> needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+            for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) {
+                Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr);
+                Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode);
+                if (isolateTimes == null) {
+                    logger.warn(
+                            "The current task has not been isolated, so it don't need to offline isolation, taskCode: {}",
+                            isolationTaskCode);
+                    continue;
+                }
+                if (isolateTimes == 1) {
+                    isolationTaskCodesToTimesMap.remove(isolationTaskCode);
+                    ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
+                    if (iTaskProcessor == null) {
+                        // the current task has not been submitted
+                        continue;
+                    }
+                    // the current task has no pre isolation, restart
+                    TaskInstance taskInstance = iTaskProcessor.taskInstance();
+                    if (taskInstance.getState() == ExecutionStatus.PAUSE_BY_ISOLATION) {
+                        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                        addTaskToStandByList(taskInstance);
+                        logger.info(
+                                "Cancel isolation task, the current task state is pause_by_isolation, change to submitted_success and add it back to standbyList");
+                        continue;
+                    }
+                    if (taskInstance.getState() == ExecutionStatus.KILL_BY_ISOLATION) {
+                        addTaskToStandByList(cloneCancelIsolationTaskInstance(taskInstance));
+                        logger.info(
+                                "Cancel isolation task, the current task state is kill_by_isolation, change to submitted_success and add it back to standbyList");
+                        continue;
+                    }
+                } else {
+                    isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
+                }
+
+            }
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
+        }
+
+    }
+
     /**
      * crate new task instance to retry, different objects from the original
      *
@@ -798,6 +886,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
         // generate process dag
         dag = DagHelper.buildDagGraph(processDag);
+        List<IsolationTask> isolationTasks =
+                isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE);
+        for (IsolationTask isolationTask : isolationTasks) {
+            Set<String> allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag);
+            allPostNodes.forEach(postNode -> {
+                isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(),
+                        isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1);
+            });
+        }
         logger.info("Build dag success, dag: {}", dag);
     }
 
@@ -933,6 +1030,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
         try {
+            if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION);
+                logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}",
+                        taskInstance.getTaskCode());
+            }
+
             // package task instance before submit
             processService.packageTaskInstance(taskInstance, processInstance);
 
@@ -1117,6 +1220,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return newTaskInstance;
     }
 
+    private @Nullable TaskInstance cloneCancelIsolationTaskInstance(TaskInstance taskInstance) {
+        TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
+        if (taskNode == null) {
+            logger.error("clone retry task instance error, taskNode is null, code:{}", taskInstance.getTaskCode());
+            return null;
+        }
+        TaskInstance newTaskInstance = TaskInstanceUtils.cloneTaskInstance(processInstance, taskNode, taskInstance);
+        TaskInstanceUtils.injectEnvironment(taskInstance, getTaskInstanceEnvironment(newTaskInstance));
+        return newTaskInstance;
+    }
+
     private @Nullable Environment getTaskInstanceEnvironment(@NonNull TaskInstance taskInstance) {
         Environment environment = null;
         if (!taskInstance.getEnvironmentCode().equals(-1L)) {
@@ -1468,17 +1582,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
         // success
         if (state == ExecutionStatus.RUNNING_EXECUTION) {
-            List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
             if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
                 // tasks currently pending submission, no retries, indicating that depend is waiting to complete
                 return ExecutionStatus.RUNNING_EXECUTION;
-            } else if (CollectionUtils.isNotEmpty(killTasks)) {
+            }
+            if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL))) {
                 // tasks maybe killed manually
                 return ExecutionStatus.FAILURE;
-            } else {
-                // if the waiting queue is empty and the status is in progress, then success
-                return ExecutionStatus.SUCCESS;
             }
+            if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL_BY_ISOLATION))
+                    || CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.PAUSE_BY_ISOLATION))) {
+                // No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION
+                return ExecutionStatus.PAUSE_BY_ISOLATION;
+            }
+            // if the waiting queue is empty and the status is in progress, then success
+            return ExecutionStatus.SUCCESS;
         }
 
         return state;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index ad3d42a3a5..e86a93ffc1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
@@ -111,6 +112,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected boolean timeout = false;
 
+    protected boolean isolated = false;
+
     protected TaskInstance taskInstance = null;
 
     protected ProcessInstance processInstance;
@@ -160,6 +163,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      */
     protected abstract boolean killTask();
 
+    protected abstract boolean isolateTask();
+
     protected abstract boolean taskTimeout();
 
     protected abstract boolean runTask();
@@ -193,6 +198,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
                 case DISPATCH:
                     result = dispatch();
                     break;
+                case ISOLATE:
+                    result = isolate();
+                    break;
                 default:
                     logger.error("unknown task action: {}", taskAction);
             }
@@ -256,6 +264,14 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         return paused;
     }
 
+    private boolean isolate() {
+        if (isolated) {
+            return true;
+        }
+        isolated = isolateTask();
+        return isolated;
+    }
+
     protected boolean stop() {
         if (killed) {
             return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 5fcad28858..3285623186 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -21,7 +21,6 @@ import com.google.auto.service.AutoService;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.BlockingOpportunity;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -30,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -89,6 +87,15 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        logger.info("Blocking task has been isolated");
+        return true;
+    }
+
     @Override
     protected boolean taskTimeout() {
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 7b45be6373..e1ef11eef3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
+import com.google.auto.service.AutoService;
 import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -34,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 
 import java.util.Date;
 
-import com.google.auto.service.AutoService;
-
 /**
  * common task processor
  */
@@ -130,6 +129,27 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        try {
+            if (taskInstance.getState().typeIsFinished()) {
+                return true;
+            }
+            taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+            taskInstance.setEndTime(new Date());
+            processService.updateTaskInstance(taskInstance);
+            if (StringUtils.isNotEmpty(taskInstance.getHost())) {
+                killRemoteTask();
+            }
+            logger.info("Master isolate taskInstance success, taskName: {} taskInstanceId: {}", taskInstance.getName(),
+                    taskInstance.getId());
+            return true;
+        } catch (Exception e) {
+            logger.error("Master isolate task error, taskInstance id: {}", taskInstance.getId(), e);
+            return false;
+        }
+    }
+
     @Override
     protected boolean taskTimeout() {
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 7d3669c443..5c76496aa3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.server.master.runner.task;
 
 import com.google.auto.service.AutoService;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -27,7 +26,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -106,6 +104,15 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        logger.info("Condition task has been isolated");
+        return true;
+    }
+
     @Override
     public String getType() {
         return TASK_TYPE_CONDITIONS;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 6dc5cc6b7f..d5fe056820 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -202,6 +202,14 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        return true;
+    }
+
     /**
      * judge all dependent tasks finish
      *
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 754eba7d50..a681366ea5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -201,6 +201,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        ProcessInstance subProcessInstance =
+                processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
+        if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
+            return true;
+        }
+        // todo: isolate sub process
+        return true;
+    }
+
     private void sendToSubProcess() {
         StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                 processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index e3ba4c4db0..8ed3c42ce8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -85,6 +85,14 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        return true;
+    }
+
     @Override
     protected boolean taskTimeout() {
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 9044945258..2c60d364bc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -22,6 +22,7 @@ package org.apache.dolphinscheduler.server.master.runner.task;
  */
 public enum TaskAction {
     PAUSE,
+    ISOLATE,
     STOP,
     TIMEOUT,
     SUBMIT,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
new file mode 100644
index 0000000000..2621766d84
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
@@ -0,0 +1,97 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class IsolationTaskManager {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    private volatile Map<Long, IsolationTask> onlineIsolationTasksInMemory = new HashMap<>();
+
+    private volatile Set<Long> currentIsolationTaskIdsInMemory = new HashSet<>();
+
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @PostConstruct
+    public void init() {
+        refreshIsolationTaskMapFromDB();
+    }
+
+    public void refreshIsolationTaskMapFromDB() {
+        Map<Long, IsolationTask> totalOnlineIsolationTasksInDB =
+                isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE)
+                        .stream()
+                        .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
+        Set<Long> totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet();
+
+        Collection<IsolationTask> needToOfflineIsolationTasks =
+                CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB)
+                        .stream()
+                        .map(onlineIsolationTasksInMemory::get)
+                        .collect(Collectors.toList());
+
+        Collection<IsolationTask> needToOnlineIsolationTasks =
+                CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory)
+                        .stream()
+                        .map(totalOnlineIsolationTasksInDB::get)
+                        .collect(Collectors.toList());
+
+        currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB;
+        onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB;
+
+        offlineIsolationTask(needToOfflineIsolationTasks);
+        onlineIsolationTask(needToOnlineIsolationTasks);
+    }
+
+    private void offlineIsolationTask(Collection<IsolationTask> needOfflineIsolationTasks) {
+        if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) {
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode());
+            log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId());
+        }
+    }
+
+    private void onlineIsolationTask(Collection<IsolationTask> needOnlineIsolationTasks) {
+        if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) {
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode());
+            log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId());
+        }
+    }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
index 5d86abc8e4..e4bdf3511c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
@@ -166,7 +166,6 @@ public class TaskInstanceUtils {
         taskInstance.setTaskDefine(oldTaskInstance.getTaskDefine());
         taskInstance.setProcessDefine(oldTaskInstance.getProcessDefine());
         taskInstance.setProcessInstance(processInstance);
-        taskInstance.setState(oldTaskInstance.getState());
         return taskInstance;
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
index e1088bf83d..00ae91019d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
@@ -36,6 +36,8 @@ public class WorkflowInstanceUtils {
                 return getStartTaskInstanceIdsFromRecoverParam(processInstance);
             case START_FROM_STATE_CLEAN_TASKS:
                 return getStartTaskInstanceIdsFromStateCleanParam(processInstance);
+            case RECOVERY_FROM_ISOLATION_TASKS:
+                return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance);
             default:
                 return Collections.emptyList();
         }
@@ -69,6 +71,22 @@ public class WorkflowInstanceUtils {
         return stateCleanTaskInstanceIds;
     }
 
+    public static List<Integer> getStartTaskInstanceIdsFromRecoverIsolationParam(@NonNull ProcessInstance processInstance) {
+        Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+        List<Integer> recoveryPausedIsolationIds =
+                JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+                        new TypeReference<ArrayList<Integer>>() {
+                        });
+        List<Integer> recoveryKilledIsolationIds =
+                JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+                        new TypeReference<ArrayList<Integer>>() {
+                        });
+        List<Integer> result = new ArrayList<>();
+        result.addAll(recoveryPausedIsolationIds);
+        result.addAll(recoveryKilledIsolationIds);
+        return result;
+    }
+
     public static List<String> getStartNodeName(@NonNull ProcessInstance processInstance) {
         List<String> startNodeNameList = new ArrayList<>();
         Map<String, String> paramMap = JSONUtils.toMap(processInstance.getCommandParam());
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 6f55e1d5b5..856d38cb2f 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -92,6 +93,8 @@ public class WorkflowExecuteRunnableTest {
 
     private CuringParamsService curingGlobalParamsService;
 
+    private IsolationTaskDao isolationTaskDao;
+
     @Before
     public void init() throws Exception {
         applicationContext = mock(ApplicationContext.class);
@@ -124,10 +127,11 @@ public class WorkflowExecuteRunnableTest {
         curingGlobalParamsService = mock(CuringParamsService.class);
         NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
         ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
+        isolationTaskDao = mock(IsolationTaskDao.class);
         workflowExecuteThread =
                 PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao,
                         nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread,
-                        curingGlobalParamsService));
+                        curingGlobalParamsService, isolationTaskDao));
         // prepareProcess init dag
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 37f0b97f22..daf92071b5 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -119,5 +119,9 @@ public enum CommandType {
     /**
      * workflow executing data response, from master to api
      */
-    WORKFLOW_EXECUTING_DATA_RESPONSE;
+    WORKFLOW_EXECUTING_DATA_RESPONSE,
+
+    REFRESH_ISOLATION_REQUEST,
+    ;
+
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
new file mode 100644
index 0000000000..9b097843c4
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.remote.command.isolation;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.io.Serializable;
+
+public class RefreshIsolationTaskRequest implements Serializable {
+
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 009b872681..c1fe78e78a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1047,6 +1047,27 @@ public class ProcessServiceImpl implements ProcessService {
                 processInstance.setRunTimes(runTime + 1);
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 break;
+            case RECOVERY_FROM_ISOLATION_TASKS:
+                Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+                List<Integer> recoveryPausedIsolationIds =
+                        JSONUtils.parseObject(
+                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+                                new TypeReference<ArrayList<Integer>>() {
+                                });
+                if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) {
+                    recoveryPausedIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
+                }
+                List<Integer> recoveryKilledIsolationIds =
+                        JSONUtils.parseObject(
+                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+                                new TypeReference<ArrayList<Integer>>() {
+                                });
+                if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) {
+                    recoveryKilledIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
+                }
+                processInstance.setRunTimes(runTime + 1);
+                processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                break;
             default:
                 break;
         }
@@ -1564,9 +1585,6 @@ public class ProcessServiceImpl implements ProcessService {
                     taskInstance.getTaskCode());
             return null;
         }
-        if (processInstanceState == ExecutionStatus.READY_PAUSE) {
-            taskInstance.setState(ExecutionStatus.PAUSE);
-        }
         taskInstance.setExecutorId(processInstance.getExecutorId());
         taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
         if (taskInstance.getSubmitTime() == null) {
@@ -1603,7 +1621,8 @@ public class ProcessServiceImpl implements ProcessService {
         if (state == ExecutionStatus.RUNNING_EXECUTION
                 || state == ExecutionStatus.DELAY_EXECUTION
                 || state == ExecutionStatus.KILL
-                || state == ExecutionStatus.DISPATCH) {
+                || state == ExecutionStatus.DISPATCH
+                || state == ExecutionStatus.PAUSE_BY_ISOLATION) {
             return state;
         }
         // return pasue /stop if process instance state is ready pause / stop
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index da728b2fdb..a83184e16b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -17,10 +17,10 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.enums;
 
-import java.util.HashMap;
-
 import com.baomidou.mybatisplus.annotation.EnumValue;
 
+import java.util.HashMap;
+
 /**
  * running status for workflow and task nodes
  */
@@ -65,6 +65,8 @@ public enum ExecutionStatus {
     READY_BLOCK(15, "ready block"),
     BLOCK(16, "block"),
     DISPATCH(17, "dispatch"),
+    PAUSE_BY_ISOLATION(18, "paused by isolation"),
+    KILL_BY_ISOLATION(19, "killed by isolation"),
     ;
 
     ExecutionStatus(int code, String descp) {
@@ -117,10 +119,14 @@ public enum ExecutionStatus {
      * @return status
      */
     public boolean typeIsFinished() {
-        return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
+        return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation()
                 || typeIsStop() || typeIsBlock();
     }
 
+    public boolean typeIsReady() {
+        return this == READY_PAUSE || this == READY_STOP || this == READY_BLOCK;
+    }
+
     /**
      * status is waiting thread
      *
@@ -139,6 +145,18 @@ public enum ExecutionStatus {
         return this == PAUSE;
     }
 
+    public boolean typeIsPauseByIsolation() {
+        return this == PAUSE_BY_ISOLATION;
+    }
+
+    public boolean typeIsKilledByIsolation() {
+        return this == KILL_BY_ISOLATION;
+    }
+
+    public boolean typeIsIsolated() {
+        return this == PAUSE_BY_ISOLATION || this == KILL_BY_ISOLATION;
+    }
+
     /**
      * status is pause
      *
@@ -172,7 +190,7 @@ public enum ExecutionStatus {
      * @return status
      */
     public boolean typeIsCancel() {
-        return this == KILL || this == STOP;
+        return this == KILL || this == STOP || this == KILL_BY_ISOLATION;
     }
 
     public int getCode() {