You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/10/28 06:50:15 UTC

[dolphinscheduler] branch dev_wenjun_isolateTask created (now 8268714c9a)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a change to branch dev_wenjun_isolateTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


      at 8268714c9a Implement isolate task api interface

This branch includes the following new commits:

     new 8268714c9a Implement isolate task api interface

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[dolphinscheduler] 01/01: Implement isolate task api interface

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_isolateTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 8268714c9ac26cb2602ebf762ed8284d2a830123
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Oct 12 22:14:37 2022 +0800

    Implement isolate task api interface
---
 .../api/checker/IsolationTaskChecker.java          |  34 +++
 .../api/checker/IsolationTaskCheckerImpl.java      | 206 +++++++++++++++++
 .../api/checker/WorkflowInstanceCheckerImpl.java   |  24 +-
 .../constants/ApiFuncIdentificationConstant.java   |   5 +
 .../api/controller/IsolationTaskController.java    |  96 ++++++++
 .../dto/request/CleanTaskInstanceStateRequest.java |   2 +
 ...equest.java => IsolationTaskCancelRequest.java} |   8 +-
 ...equest.java => IsolationTaskDeleteRequest.java} |   6 +-
 .../dto/request/IsolationTaskListingRequest.java   |  24 ++
 .../dto/request/IsolationTaskSubmitRequest.java    |  18 ++
 .../apache/dolphinscheduler/api/enums/Status.java  |  39 +++-
 .../api/remote/ApiServerRPCClient.java             |  35 +++
 .../api/service/IsolationTaskService.java          |  32 +++
 .../api/service/ProcessInstanceService.java        |   8 +-
 .../api/service/ProjectService.java                |   2 +
 .../api/service/WorkflowDAGService.java            |  15 ++
 .../api/service/impl/IsolationTaskServiceImpl.java | 246 +++++++++++++++++++++
 .../api/service/impl/ProjectServiceImpl.java       |   7 +-
 .../api/service/impl/TaskInstanceServiceImpl.java  |  25 ++-
 .../api/service/impl/WorkflowDagServiceImpl.java   |  64 ++++++
 .../api/transformer/CommandTransformer.java        |  13 +-
 .../api/transformer/CommandTransformerImpl.java    |  70 +++---
 .../api/vo/IsolationTaskExcelParseVO.java          |  25 +++
 .../src/main/resources/static/excel-en.xlsx        | Bin 0 -> 5718 bytes
 .../apache/dolphinscheduler/common/Constants.java  |   5 +-
 .../dolphinscheduler/common/enums/CommandType.java |   2 +
 .../dao/dto/IsolationTaskStatus.java               |  27 +++
 .../dolphinscheduler/dao/entity/IsolationTask.java |  37 ++++
 .../dao/mapper/IsolationTaskMapper.java            |  32 +++
 .../dao/repository/IsolationTaskDao.java           |  39 ++++
 .../dao/repository/ProcessDefinitionLogDao.java    |  12 +
 .../dao/repository/ProcessInstanceDao.java         |   1 +
 .../dao/repository/ProcessTaskRelationLogDao.java  |  11 +
 .../dao/repository/TaskDefinitionLogDao.java       |  12 +
 .../dao/repository/TaskInstanceDao.java            |   2 +
 .../dao/repository/impl/IsolationTaskDaoImpl.java  |  86 +++++++
 .../impl/ProcessDefinitionLogDaoImpl.java          |  25 +++
 .../impl/ProcessTaskRelationLogDaoImpl.java        |  24 ++
 .../repository/impl/TaskDefinitionLogDaoImpl.java  |  44 ++++
 .../dao/repository/impl/TaskInstanceDaoImpl.java   |   6 +
 .../dolphinscheduler/dao/utils/DagHelper.java      |  99 ++++++++-
 .../dao/mapper/IsolationTaskMapper.xml             |  97 ++++++++
 .../src/main/resources/sql/dolphinscheduler_h2.sql |  17 ++
 .../main/resources/sql/dolphinscheduler_mysql.sql  |  73 +++---
 .../resources/sql/dolphinscheduler_postgresql.sql  |  18 ++
 .../dolphinscheduler/dao/utils/DagHelperTest.java  |  33 ++-
 .../isolation/RefreshIsolationTaskProcessor.java   |  26 +++
 .../server/master/rpc/MasterRPCServer.java         |   8 +
 .../master/runner/FailoverExecuteThread.java       |   5 +
 .../master/runner/MasterSchedulerBootstrap.java    |   7 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 130 ++++++++++-
 .../master/runner/task/BaseTaskProcessor.java      |  16 ++
 .../master/runner/task/BlockingTaskProcessor.java  |  11 +-
 .../master/runner/task/CommonTaskProcessor.java    |  24 +-
 .../master/runner/task/ConditionTaskProcessor.java |  11 +-
 .../master/runner/task/DependentTaskProcessor.java |   8 +
 .../master/runner/task/SubTaskProcessor.java       |  11 +
 .../master/runner/task/SwitchTaskProcessor.java    |   8 +
 .../server/master/runner/task/TaskAction.java      |   1 +
 .../master/service/IsolationTaskManager.java       |  97 ++++++++
 .../server/master/utils/TaskInstanceUtils.java     |   1 -
 .../server/master/utils/WorkflowInstanceUtils.java |  18 ++
 .../master/runner/WorkflowExecuteRunnableTest.java |   6 +-
 .../remote/command/CommandType.java                |   6 +-
 .../isolation/RefreshIsolationTaskRequest.java     |  18 ++
 .../service/process/ProcessServiceImpl.java        |  27 ++-
 .../plugin/task/api/enums/ExecutionStatus.java     |  26 ++-
 67 files changed, 2037 insertions(+), 134 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
new file mode 100644
index 0000000000..9ee19f10be
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
@@ -0,0 +1,34 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface IsolationTaskChecker {
+
+    void checkCanSubmitTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     ProcessInstance processInstance,
+                                     @NonNull List<IsolationTaskExcelParseVO> voList);
+
+    void checkCanOnlineTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     @NonNull ProcessInstance processInstance,
+                                     @NonNull IsolationTask isolationTask);
+
+    void checkCanListingTaskIsolation(@NonNull User loginUser,
+                                      long projectCode);
+
+    void checkCanDeleteTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     long isolationId);
+
+    void checkCanCancelTaskIsolation(@NonNull User loginUser,
+                                     long projectCode,
+                                     ProcessInstance processInstance,
+                                     @NonNull IsolationTask isolationTasks);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
new file mode 100644
index 0000000000..99402f7fb6
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
@@ -0,0 +1,206 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
+
+@Component
+public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private ProcessDefinitionLogDao processDefinitionLogDao;
+
+    @Autowired
+    private WorkflowDAGService workflowDAGService;
+
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @Override
+    public void checkCanSubmitTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            ProcessInstance processInstance,
+                                            @NonNull List<IsolationTaskExcelParseVO> voList) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+
+        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion())
+                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+        checkSubmitIsolateTaskAuth(loginUser, project, processDefinitionLog);
+
+        DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(processDefinitionLog);
+        checkWorkflowInstanceCanSubmitIsolateTask(processInstance.getName(), processInstance);
+
+        checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG);
+    }
+
+    @Override
+    public void checkCanOnlineTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            ProcessInstance processInstance,
+                                            @NonNull IsolationTask isolationTask) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        checkOnlineIsolationTaskAuth(loginUser, project, processInstance);
+
+        if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) {
+            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE);
+        }
+
+        String workflowInstanceName = isolationTask.getWorkflowInstanceName();
+        checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance);
+    }
+
+    @Override
+    public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST);
+    }
+
+    @Override
+    public void checkCanDeleteTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            long isolationId) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE);
+
+        IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
+                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+        if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE);
+        }
+    }
+
+    @Override
+    public void checkCanCancelTaskIsolation(@NonNull User loginUser,
+                                            long projectCode,
+                                            ProcessInstance processInstance,
+                                            @NonNull IsolationTask isolationTask) {
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        checkCancelIsolationTaskAuth(loginUser, project, processInstance);
+
+        if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL);
+        }
+    }
+
+    private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName,
+                                                           ProcessInstance processInstance) {
+        // check if the given workflow instance can do isolate operation
+        // If the workflow instance is at ready_xx status, it cannot do isolate operation
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceName);
+        }
+        if (processInstance.getState().typeIsReady()) {
+            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT, workflowInstanceName);
+        }
+    }
+
+    private void checkTaskNodeCanSubmitIsolateTask(@NonNull List<IsolationTaskExcelParseVO> voList,
+                                                   @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        for (IsolationTaskExcelParseVO vo : voList) {
+            String taskCodeStr = Long.toString(vo.getTaskCode());
+            // check if the taskNode exist in DAG
+            if (!workflowDAG.containsNode(taskCodeStr)) {
+                throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST, vo.getTaskName());
+            }
+            // check if the pre task exist an online isolation task
+            // if existed, we cannot create a new isolation task
+            Set<String> allPreNodes = DagHelper.getAllPreNodes(taskCodeStr, workflowDAG);
+            List<IsolationTask> isolationTasks = isolationTaskDao.queryByTaskCodes(vo.getWorkflowInstanceId(),
+                    allPreNodes.stream().map(Long::parseLong).collect(Collectors.toList()));
+            // todo: Do we need to support if the sub isolation task is offline?
+            if (CollectionUtils.isNotEmpty(isolationTasks)) {
+                throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK);
+            }
+        }
+    }
+
+    private void checkSubmitIsolateTaskAuth(@NonNull User loginUser,
+                                            @NonNull Project project,
+                                            @NonNull ProcessDefinitionLog processDefinitionLog) {
+        if (processDefinitionLog.getProjectCode() != project.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT);
+    }
+
+    private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion())
+                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+        if (processDefinitionLog.getProjectCode() != project.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
+        }
+
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE);
+    }
+
+    private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+        }
+        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion())
+                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+        if (processDefinitionLog.getProjectCode() != project.getCode()) {
+            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_CANCEL);
+
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
index 1805934628..05e708d3b0 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
@@ -2,11 +2,11 @@ package org.apache.dolphinscheduler.api.checker;
 
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProjectService;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -14,13 +14,12 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
 import static org.apache.dolphinscheduler.api.enums.Status.CLEAN_TASK_INSTANCE_ERROR_WORKFLOW_INSTANCE_IS_RUNNING;
 import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
 import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.PROJECT_NOT_EXIST;
 
 @Component
 public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
 
     @Autowired
-    private ProcessDefinitionDao processDefinitionDao;
+    private ProcessDefinitionLogDao processDefinitionLogDao;
 
     @Autowired
     private ProjectService projectService;
@@ -30,16 +29,13 @@ public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
         if (processInstance == null) {
             throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
         }
-        // todo: check permission
-        ProcessDefinition processDefinition =
-                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode());
-        if (processDefinition == null) {
-            throw new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getProcessDefinitionCode());
-        }
-        Project project = projectService.queryByCode(loginUser, processDefinition.getProjectCode());
-        if (project == null) {
-            throw new ServiceException(PROJECT_NOT_EXIST);
-        }
+        ProcessDefinitionLog processDefinitionLog =
+                processDefinitionLogDao
+                        .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+                                processInstance.getProcessDefinitionVersion())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+        Project project = projectService.queryByCode(loginUser, processDefinitionLog.getProjectCode());
         projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CLEAN_TASK_INSTANCE_STATE);
 
         // check state
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
index fba7677ab7..c8cdd2cb58 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
@@ -96,6 +96,11 @@ public class ApiFuncIdentificationConstant {
     public static final String INSTANCE_BATCH_DELETE = "project:process-instance:batch-delete";
     public static final String FORCED_SUCCESS = "project:task-instance:force-success";
     public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state";
+    public static final String ISOLATION_TASK_LIST = "project:isolation-task:list";
+    public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online";
+    public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit";
+    public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel";
+    public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete";
     public static final String VIEW_LOG = "project:log:detail";
     public static final String DOWNLOAD_LOG = "project:log:download-log";
     public static final String PROJECT_OVERVIEW = "project:overview:view";
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
new file mode 100644
index 0000000000..f4b445456d
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
@@ -0,0 +1,96 @@
+package org.apache.dolphinscheduler.api.controller;
+
+import io.swagger.annotations.Api;
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.IsolationTaskService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
+
+import javax.validation.Valid;
+
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR;
+
+@Api(tags = "ISOLATION_TASK_TAG")
+@RestController
+@RequestMapping("/projects/{projectCode}/isolation-task")
+public class IsolationTaskController {
+
+    @Autowired
+    private IsolationTaskService isolationTaskService;
+
+    @PostMapping(value = "/submit")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> submitIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @RequestBody IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
+        isolationTaskService.submitTaskIsolations(loginUser, projectCode, isolationTaskSubmitRequest);
+        return Result.success(null);
+    }
+
+    @PutMapping(value = "/online/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @PathVariable(name = "id") long isolationId) {
+        isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId);
+        return Result.success(null);
+    }
+
+    @PutMapping(value = "/cancel/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> cancelIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @PathVariable(name = "id") long isolationId) {
+        isolationTaskService.cancelTaskIsolation(loginUser, projectCode, isolationId);
+        return Result.success(null);
+    }
+
+    @GetMapping("")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_LISTING_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<PageInfo<IsolationTask>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                @PathVariable long projectCode,
+                                                                @RequestBody @Valid IsolationTaskListingRequest request) {
+        return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request));
+    }
+
+    @DeleteMapping("/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(ISOLATION_TASK_DELETE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                            @PathVariable long projectCode,
+                                            @PathVariable long id) {
+        isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id);
+        return Result.success(null);
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
index c9afc8fabd..3fe81f7a71 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotEmpty;
 import java.util.List;
 
 @Data
@@ -11,5 +12,6 @@ import java.util.List;
 @AllArgsConstructor
 public class CleanTaskInstanceStateRequest {
 
+    @NotEmpty
     private List<Integer> taskInstanceIds;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
similarity index 60%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
index c9afc8fabd..a7e00677b5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
@@ -4,12 +4,14 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotEmpty;
 import java.util.List;
 
 @Data
-@NoArgsConstructor
 @AllArgsConstructor
-public class CleanTaskInstanceStateRequest {
+@NoArgsConstructor
+public class IsolationTaskCancelRequest {
 
-    private List<Integer> taskInstanceIds;
+    @NotEmpty
+    private List<Long> isolationIds;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
similarity index 60%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
index c9afc8fabd..82104c3c9d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
@@ -4,12 +4,14 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotEmpty;
 import java.util.List;
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
-public class CleanTaskInstanceStateRequest {
+public class IsolationTaskDeleteRequest {
 
-    private List<Integer> taskInstanceIds;
+    @NotEmpty
+    private List<Long> isolationIds;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java
new file mode 100644
index 0000000000..fecd65e9c6
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskListingRequest {
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    @NotNull(message = "page number cannot be null")
+    private Integer pageNo;
+
+    @NotNull(message = "page size cannot be null")
+    private Integer pageSize;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java
new file mode 100644
index 0000000000..d89b681eaf
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+
+import javax.validation.constraints.NotEmpty;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskSubmitRequest {
+
+    @NotEmpty
+    private List<IsolationTaskExcelParseVO> isolationTaskExcelParseVOList;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 37f89b406f..602cb477dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -513,7 +513,44 @@ public enum Status {
     CONFIG_FILTER_EMPTY(1400007, "complement time is empty after filtering according to the configuration",
             "当前补数时间根据配置过滤后为空"),
     PROJECT_NAME_TOO_LONG_ERROR(1400008, "project name is too long error", "项目名称过长"),
-    ;
+
+    ISOLATION_TASK_NOT_EXIST(1500000, "Isolation task not exist", "隔离任务不存在"),
+    ISOLATION_TASK_SUBMIT_ERROR(1500100, "Submit isolation task error", "提交隔离任务异常"),
+    ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT(1500101,
+            "Submit isolation task error, relate workflow instance [{0}] is not support",
+            "提交隔离任务异常, 关联的工作流实例:[{0}]暂不支持该操作"),
+    ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1500102,
+            "Submit isolation task error, relate workflow instance [{0}] is not belong to current project",
+            "提交隔离任务异常, 关联的工作流实例:[{0}]不属于当前项目"),
+    ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST(1500103, "Submit isolation task error, task: [{0}] is not exist",
+            "提交隔离任务异常, 任务不存在:[{0}]"),
+    ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK(1500104,
+            "Submit isolation task error, workflow instance: [{0}] exist an sub isolation task",
+            "提交隔离任务异常, 工作流实例已经存在:[{0}]子隔离任务"),
+    ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500105,
+            "Submit isolation task error, send request to master error",
+            "提交隔离任务异常,发送请求给Master异常"),
+
+    ISOLATION_TASK_ONLINE_ERROR(1500200, "Online isolation task error", "上线隔离任务异常"),
+    ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE(1500201,
+            "Online isolation task error, the isolation task is already online", "上线隔离任务异常,该任务已处于隔离中"),
+    ISOLATION_TASK_ONLINE_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500202,
+            "Online isolation task error, send request to master error", "上线隔离任务异常,发送隔离请求给Master异常"),
+    ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500203,
+            "Online isolation task error, the process instance is not belongs to project", "上线隔离任务异常,当前工作流实例不属于目标任务"),
+
+    ISOLATION_TASK_CANCEL_ERROR(1500300, "Cancel isolation task error", "取消隔离任务异常"),
+    ISOLATION_TASK_CANCEL_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500301,
+            "Cancel isolation task error, send request to master error", "取消隔离任务异常,发送隔离请求给Master异常"),
+    ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL(1500302,
+            "Cancel isolation task error, this task isolation is already been cancel", "取消隔离任务异常,该隔离已经下线"),
+    ISOLATION_TASK_CANCEL_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500303,
+            "Cancel isolation task error, the process instance is not belongs to project", "取消隔离任务异常,当前工作流实例不属于目标任务"),
+    ISOLATION_TASK_LISTING_ERROR(1500400, "Listing isolation task error", "查询隔离任务列表异常"),
+    ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"),
+    ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline",
+            "删除隔离任务异常,该隔离任务尚未下线"),
+            ;
 
     private final int code;
     private final String enMsg;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
new file mode 100644
index 0000000000..ef3634cd52
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
@@ -0,0 +1,35 @@
+package org.apache.dolphinscheduler.api.remote;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ApiServerRPCClient {
+
+    private final NettyRemotingClient client;
+
+    private static final long DEFAULT_TIME_OUT_MILLS = 1_000L;
+
+    public ApiServerRPCClient() {
+        client = new NettyRemotingClient(new NettyClientConfig());
+        log.info("Success initialized ApiServerRPCClient...");
+    }
+
+    public void sendSyncCommand(@NonNull Host host,
+                                @NonNull Command rpcCommand) throws RemotingException, InterruptedException {
+        sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS);
+    }
+
+    public void sendSyncCommand(@NonNull Host host,
+                                @NonNull Command rpcCommand,
+                                long timeoutMills) throws RemotingException, InterruptedException {
+        client.sendSync(host, rpcCommand, timeoutMills);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
new file mode 100644
index 0000000000..7225d19007
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
@@ -0,0 +1,32 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+public interface IsolationTaskService {
+
+    void submitTaskIsolations(@NonNull User loginUser,
+                              long projectCode,
+                              @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest);
+
+    void onlineTaskIsolation(@NonNull User loginUser,
+                             long projectCode,
+                             long isolationTaskId);
+
+    void cancelTaskIsolation(@NonNull User loginUser,
+                             long projectCode,
+                             long isolationId);
+
+    PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
+                                                 long projectCode,
+                                                 @NonNull IsolationTaskListingRequest request);
+
+    void deleteTaskIsolation(@NonNull User loginUser,
+                             long projectCode,
+                             long id);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 58baef26a6..f3a9069a72 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -18,16 +18,16 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 /**
  * process instance service
  */
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
index 716e2fba87..64fcf8fd99 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
@@ -44,6 +45,7 @@ public interface ProjectService {
      * @param projectCode project code
      * @return project detail information
      */
+    @NonNull
     Project queryByCode(User loginUser, long projectCode);
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java
new file mode 100644
index 0000000000..739c11a616
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+public interface WorkflowDAGService {
+
+    DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
+                                                           @NonNull Integer processDefinitionVersion);
+
+    DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
new file mode 100644
index 0000000000..4f6ff908b1
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
@@ -0,0 +1,246 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.api.checker.IsolationTaskChecker;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient;
+import org.apache.dolphinscheduler.api.service.IsolationTaskService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.transformer.CommandTransformer;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR;
+
+@Slf4j
+@Service
+public class IsolationTaskServiceImpl implements IsolationTaskService {
+
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @Autowired
+    private IsolationTaskChecker isolationTaskChecker;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private CommandTransformer commandTransformer;
+
+    @Autowired
+    private ApiServerRPCClient apiServerRPCClient;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private WorkflowDAGService workflowDAGService;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Override
+    @Transactional
+    public void submitTaskIsolations(@NonNull User loginUser,
+                                     long projectCode,
+                                     @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
+        Map<Integer, List<IsolationTaskExcelParseVO>> workflow2VoMap =
+                isolationTaskSubmitRequest.getIsolationTaskExcelParseVOList()
+                        .stream().collect(
+                                HashMap::new, (map, vo) -> {
+                                    map.computeIfAbsent(vo.getWorkflowInstanceId(), k -> new ArrayList<>()).add(vo);
+                                },
+                                Map::putAll);
+
+        List<Pair<ProcessInstance, List<IsolationTask>>> needToOnlineIsolations = new ArrayList<>();
+        List<IsolationTask> needToInsertIntoDB = new ArrayList<>();
+        for (Map.Entry<Integer, List<IsolationTaskExcelParseVO>> entry : workflow2VoMap.entrySet()) {
+            Integer workflowInstanceId = entry.getKey();
+            List<IsolationTaskExcelParseVO> vos = entry.getValue();
+            ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+            isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos);
+
+            List<IsolationTask> isolationTasks = entry.getValue().stream().map(vo -> {
+                return IsolationTask.builder()
+                        .workflowInstanceId(vo.getWorkflowInstanceId())
+                        .workflowInstanceName(vo.getWorkflowInstanceName())
+                        .taskName(vo.getTaskName())
+                        .taskCode(vo.getTaskCode())
+                        .status(IsolationTaskStatus.ONLINE.getCode())
+                        .build();
+            }).collect(Collectors.toList());
+            needToInsertIntoDB.addAll(isolationTasks);
+            needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks));
+        }
+        isolationTaskDao.batchInsert(needToInsertIntoDB);
+        // we split here to avoid rollback RPC request
+        try {
+            refreshIsolationTasks();
+        } catch (Exception ex) {
+            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR);
+        }
+    }
+
+    @Override
+    @Transactional
+    public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) {
+        IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId)
+                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+        ProcessInstance processInstance =
+                processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId());
+
+        isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
+        isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE);
+        try {
+            refreshIsolationTasks();
+        } catch (Exception ex) {
+            throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR);
+        }
+    }
+
+    @Override
+    @Transactional
+    public void cancelTaskIsolation(@NonNull User loginUser,
+                                    long projectCode,
+                                    long isolationId) {
+        IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
+                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+
+        Integer workflowInstanceId = isolationTask.getWorkflowInstanceId();
+        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+        isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
+        isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE);
+        insertRecoveryCommandIfNeeded(processInstance, isolationTask);
+        try {
+            refreshIsolationTasks();
+        } catch (RemotingException | InterruptedException e) {
+            throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR);
+        }
+    }
+
+    @Override
+    public PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
+                                                        long projectCode,
+                                                        @NonNull IsolationTaskListingRequest request) {
+        isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode);
+
+        Integer pageNo = request.getPageNo();
+        Integer pageSize = request.getPageSize();
+
+        IPage<IsolationTask> iPage = isolationTaskDao.pageQueryIsolationTask(
+                request.getWorkflowInstanceName(),
+                request.getTaskName(),
+                pageNo,
+                pageSize);
+
+        PageInfo<IsolationTask> pageInfo = new PageInfo<>(pageNo, pageSize);
+        pageInfo.setTotal((int) iPage.getTotal());
+        pageInfo.setTotalList(iPage.getRecords());
+        return pageInfo;
+    }
+
+    @Override
+    public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) {
+        isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id);
+        int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE);
+        if (deleteNum <= 0) {
+            throw new ServiceException(ISOLATION_TASK_NOT_EXIST);
+        }
+    }
+
+    private void refreshIsolationTasks() throws RemotingException, InterruptedException {
+        List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+        if (CollectionUtils.isEmpty(masters)) {
+            return;
+        }
+
+        org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest =
+                new RefreshIsolationTaskRequest().convert2Command();
+        for (Server master : masters) {
+            try {
+                apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+                        refreshIsolationRequest);
+            } catch (RemotingException | InterruptedException e) {
+                log.error("Send RefreshIsolationTask request to master error, master: {}", master, e);
+                throw e;
+            }
+        }
+    }
+
+    private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance,
+                                               @NonNull IsolationTask isolationTask) {
+        if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) {
+            return;
+        }
+        int workflowInstanceId = processInstance.getId();
+        // find the isolationTaskInstanceIds need to recovery
+        // find the sub node is in pause or kill
+        DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(
+                processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
+
+        List<TaskInstance> taskInstances =
+                taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId);
+        Set<String> onlineIsolationTaskCodes =
+                isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE)
+                        .stream()
+                        .map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode()))
+                        .collect(Collectors.toSet());
+
+        List<TaskInstance> canRecoveryTaskInstances = taskInstances.stream()
+                .filter(taskInstance -> taskInstance.getState().typeIsIsolated())
+                .filter(taskInstance -> !DagHelper.isChildOfAnyParentNodes(String.valueOf(taskInstance.getTaskCode()),
+                        onlineIsolationTaskCodes, workflowDAG))
+                .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) {
+            return;
+        }
+        // find if this taskInstance still exist pre isolationTasks
+        Command command = commandTransformer.transformToRecoveryFromTaskIsolationCommand(processInstance,
+                canRecoveryTaskInstances);
+        processService.createCommand(command);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7f5bdf086b..3720956c45 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
@@ -168,7 +169,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
      * @return project detail information
      */
     @Override
-    public Project queryByCode(User loginUser, long projectCode) {
+    public @NonNull Project queryByCode(User loginUser, long projectCode) {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByCode(projectCode);
         hasProjectAndPerm(loginUser, project, result, PROJECT);
@@ -204,10 +205,8 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
                 perm)) {
             // check read permission
             Project checkProject = projectMapper.queryByCode(projectCode);
-            putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
+            throw new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
                     Objects.nonNull(checkProject) ? project.getName() : projectCode);
-        } else {
-            putMsg(result, Status.SUCCESS);
         }
     }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index f6da23ac0f..9d1dc166a2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
@@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -41,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -89,6 +92,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private WorkflowInstanceChecker workflowInstanceChecker;
+
     @Autowired
     private CommandTransformer commandTransformer;
 
@@ -219,7 +228,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
             List<TaskInstance> existTaskInstances = taskInstanceDao.queryTaskInstanceByIds(taskInstanceIds);
             TaskInstanceValidator.validateTaskInstanceAllExists(taskInstanceIds, existTaskInstances);
 
-            Map<Integer, List<Integer>> processInstanceId2TaskInstanceIds =
+            Map<Integer, List<Integer>> processInstanceId2TaskInstanceIdMap =
                     existTaskInstances.stream().collect(
                             HashMap::new,
                             (map, taskInstance) -> {
@@ -228,11 +237,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
                             },
                             Map::putAll);
 
-            List<Command> cleanStateCommands = commandTransformer.transformToCleanTaskInstanceStateCommands(loginUser,
-                    processInstanceId2TaskInstanceIds);
-
             // todo: use batch create and remove the transactional
-            cleanStateCommands.forEach(command -> processService.createCommand(command));
+            for (Map.Entry<Integer, List<Integer>> processInstanceId2TaskInstanceIds : processInstanceId2TaskInstanceIdMap
+                    .entrySet()) {
+                Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey();
+                List<Integer> needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue();
+                ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+                workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance);
+                Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance,
+                        needToCleanStateTaskInstanceIds);
+                processService.createCommand(command);
+            }
         } catch (ServiceException serviceException) {
             throw serviceException;
         } catch (Exception ex) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java
new file mode 100644
index 0000000000..f1ffb0f64d
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java
@@ -0,0 +1,64 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+
+@Service
+public class WorkflowDagServiceImpl implements WorkflowDAGService {
+
+    @Autowired
+    private ProcessDefinitionLogDao processDefinitionLogDao;
+
+    // todo: use dao
+    @Autowired
+    private ProcessService processService;
+
+    @Override
+    public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
+                                                                  @NonNull Integer processDefinitionVersion) {
+        ProcessDefinitionLog processDefinitionLog =
+                processDefinitionLogDao.queryProcessDefinitionByCode(processDefinitionCode, processDefinitionVersion)
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processDefinitionCode));
+        return getWorkflowDAG(processDefinitionLog);
+    }
+
+    @Override
+    public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog) {
+        List<ProcessTaskRelation> processTaskRelations =
+                processService.findRelationByCode(processDefinitionLog.getCode(), processDefinitionLog.getVersion());
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                processService.getTaskDefineLogListByRelation(processTaskRelations);
+        List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
+
+        ProcessDag processDag = DagHelper.generateFlowDag(
+                taskNodeList,
+                Collections.emptyList(),
+                Collections.emptyList(),
+                TaskDependType.TASK_POST);
+        if (processDag == null) {
+            throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
+        }
+
+        return DagHelper.buildDagGraph(processDag);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
index 51865effaf..986c2b99a6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
@@ -2,17 +2,16 @@ package org.apache.dolphinscheduler.api.transformer;
 
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
 import java.util.List;
-import java.util.Map;
 
 public interface CommandTransformer {
 
-    List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
-                                                            @NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds);
+    Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
+                                                     @NonNull List<Integer> needCleanTaskInstanceIds);
 
-    Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
-                                                     @NonNull Integer workflowInstanceId,
-                                                     @NonNull List<Integer> needCleanTaskInstances);
+    Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
+                                                        @NonNull List<TaskInstance> canRecoveryTaskInstances);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
index 2a0e46033e..9d8aadff43 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
@@ -1,53 +1,59 @@
 package org.apache.dolphinscheduler.api.transformer;
 
 import lombok.NonNull;
-import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS;
 
 @Component
 public class CommandTransformerImpl implements CommandTransformer {
 
-    @Autowired
-    private ProcessInstanceDao processInstanceDao;
+    @Override
+    public Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
+                                                            @NonNull List<Integer> needCleanTaskInstanceIds) {
 
-    @Autowired
-    private WorkflowInstanceChecker workflowInstanceChecker;
+        Command command = new Command();
+        command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
+        command.setExecutorId(processInstance.getExecutorId());
+        command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+        command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+        // we set the task post strategy, since we will go through the post task if the post task is not running
+        command.setTaskDependType(TaskDependType.TASK_POST);
+        command.setFailureStrategy(processInstance.getFailureStrategy());
+        command.setWarningType(processInstance.getWarningType());
+        command.setWarningGroupId(processInstance.getWarningGroupId());
+        command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+        command.setWorkerGroup(processInstance.getWorkerGroup());
+        command.setEnvironmentCode(processInstance.getEnvironmentCode());
+        command.setDryRun(processInstance.getDryRun());
 
-    @Override
-    public List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
-                                                                   @NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds) {
-        List<Command> commands = new ArrayList<>(workflowInstanceId2TaskInstanceIds.size());
-        workflowInstanceId2TaskInstanceIds.forEach((workflowInstanceId, taskInstanceIds) -> {
-            Command command = transformToCleanTaskInstanceStateCommand(loginUser, workflowInstanceId, taskInstanceIds);
-            commands.add(command);
-        });
-        return commands;
+        // todo:use pojo to represent CommandParam rather than map
+        Map<String, String> commandParamMap = new HashMap<>();
+        commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstanceIds));
+        command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
+        command.setProcessInstanceId(processInstance.getId());
+        return command;
     }
 
     @Override
-    public Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
-                                                            @NonNull Integer workflowInstanceId,
-                                                            @NonNull List<Integer> needCleanTaskInstances) {
-        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
-        workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, processInstance);
-
+    public Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
+                                                               @NonNull List<TaskInstance> canRecoveryIsolationTaskInstances) {
         Command command = new Command();
-        command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
+        command.setCommandType(CommandType.RECOVERY_FROM_ISOLATION_TASKS);
+
         command.setExecutorId(processInstance.getExecutorId());
         command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
         command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
@@ -62,10 +68,22 @@ public class CommandTransformerImpl implements CommandTransformer {
         command.setDryRun(processInstance.getDryRun());
 
         // todo:use pojo to represent CommandParam rather than map
+        List<Integer> recoveryPausedIsolationIds = canRecoveryIsolationTaskInstances.stream()
+                .filter(taskInstance -> taskInstance.getState().typeIsPauseByIsolation())
+                .map(TaskInstance::getId)
+                .collect(Collectors.toList());
+        List<Integer> recoveryKilledIsolationIds = canRecoveryIsolationTaskInstances.stream()
+                .filter(taskInstance -> taskInstance.getState().typeIsKilledByIsolation())
+                .map(TaskInstance::getId)
+                .collect(Collectors.toList());
         Map<String, String> commandParamMap = new HashMap<>();
-        commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstances));
+        commandParamMap.put(CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS,
+                JSONUtils.toJsonString(recoveryPausedIsolationIds));
+        commandParamMap.put(CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS,
+                JSONUtils.toJsonString(recoveryKilledIsolationIds));
         command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
         command.setProcessInstanceId(processInstance.getId());
         return command;
     }
+
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java
new file mode 100644
index 0000000000..be63400aed
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskExcelParseVO {
+
+    @NotNull(message = "Workflow instance id cannot be null")
+    private Integer workflowInstanceId;
+
+    @NotNull(message = "Workflow instance name cannot be null")
+    private String workflowInstanceName;
+
+    @NotNull(message = "Task code cannot be null")
+    private Long taskCode;
+
+    @NotNull(message = "Task name cannot be null")
+    private String taskName;
+}
diff --git a/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx
new file mode 100644
index 0000000000..f334cc6e5e
Binary files /dev/null and b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx differ
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 0499d43c30..efffbc3842 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -338,6 +338,9 @@ public final class Constants {
 
     public static final String CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS = "CleanStateTaskInstanceIds";
 
+    public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds";
+    public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds";
+
     public static final String CMD_PARAM_START_PARAMS = "StartParams";
 
     public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
@@ -859,5 +862,5 @@ public final class Constants {
      * file upload verify
      */
     public static final String FILE_TYPE_RESTRICTED_LIST = "file.type.restricted.list";
-    public static final String FILE_NAME_RESTRICTED_CONTENT= "file.name.restricted.content";
+    public static final String FILE_NAME_RESTRICTED_CONTENT = "file.name.restricted.content";
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index f9bc11fee3..62d538a58b 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -55,6 +55,8 @@ public enum CommandType {
     RECOVER_SERIAL_WAIT(11, "recover serial wait"),
     START_FROM_STATE_CLEAN_TASKS(12, "start from state clean tasks"),
 
+    RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"),
+
     ;
 
     CommandType(int code, String descp) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
new file mode 100644
index 0000000000..dcc93cc64b
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+public enum IsolationTaskStatus {
+
+    ONLINE(0),
+    OFFLINE(1),
+    ;
+
+    private final int code;
+
+    IsolationTaskStatus(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public static IsolationTaskStatus of(int code) {
+        for (IsolationTaskStatus value : IsolationTaskStatus.values()) {
+            if (value.getCode() == code) {
+                return value;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code));
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
new file mode 100644
index 0000000000..8a852aace5
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
@@ -0,0 +1,37 @@
+package org.apache.dolphinscheduler.dao.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("t_ds_isolation_task")
+public class IsolationTask {
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Long id;
+
+    private int workflowInstanceId;
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    private long taskCode;
+
+    private int status;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
new file mode 100644
index 0000000000..babb00c5bb
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -0,0 +1,32 @@
+package org.apache.dolphinscheduler.dao.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
+
+    IPage<IsolationTask> pageQuery(@Param("workflowInstanceName") String workflowInstanceName,
+                                   @Param("taskName") String taskName,
+                                   IPage<IsolationTask> page);
+
+    List<IsolationTask> queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId,
+                                         @NonNull @Param("taskCodes") List<Long> taskCodes);
+
+    void updateIsolationTaskStatus(@Param("id") long isolationTaskId,
+                                   @Param("status") int status);
+
+    int deleteByIdAndStatus(@Param("id") long isolationTaskId,
+                            @Param("status") int status);
+
+    List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId,
+                                                  @Param("status") int status);
+
+    void batchInsert(@Param("isolationTasks") List<IsolationTask> isolationTasks);
+
+    List<IsolationTask> queryByStatus(@Param("status") int code);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
new file mode 100644
index 0000000000..de16a92be0
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -0,0 +1,39 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public interface IsolationTaskDao {
+
+    /**
+     * Page query, pageNumber is start from 0.
+     */
+    IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
+                                                String taskName,
+                                                int pageNumber,
+                                                int pageSize);
+
+    List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes);
+
+    List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus);
+
+    Optional<IsolationTask> queryById(long isolationTaskId);
+
+    List<IsolationTask> queryByIds(List<Long> isolationTaskIds);
+
+    List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus);
+
+    int deleteByIdAndStatus(long id, IsolationTaskStatus status);
+
+    void insert(IsolationTask isolationTaskDTO);
+
+    void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus);
+
+    void batchInsert(List<IsolationTask> isolationTasks);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
new file mode 100644
index 0000000000..a8eba87989
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+import java.util.Optional;
+
+public interface ProcessDefinitionLogDao {
+
+    Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
+                                                                @NonNull Integer processDefinitionVersion);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index f218510191..e789101783 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -38,4 +38,5 @@ public interface ProcessInstanceDao {
      * @param processInstance processInstance
      */
     int upsertProcessInstance(ProcessInstance processInstance);
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
new file mode 100644
index 0000000000..6a606e3aef
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
@@ -0,0 +1,11 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+
+import java.util.List;
+
+public interface ProcessTaskRelationLogDao {
+
+    List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(long workflowDefinitionCode,
+                                                                       int workflowDefinitionVersion);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
new file mode 100644
index 0000000000..5c8d55b005
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+
+import java.util.List;
+
+public interface TaskDefinitionLogDao {
+
+    List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index c5b75fb0d9..3918ad8386 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -10,6 +10,8 @@ public interface TaskInstanceDao {
 
     List<TaskInstance> queryTaskInstanceByIds(List<Integer> taskInstanceId);
 
+    List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId);
+
     /**
      * Update the taskInstance, if update failed will throw exception.
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
new file mode 100644
index 0000000000..effab3aa16
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -0,0 +1,86 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+@Slf4j
+@Repository
+public class IsolationTaskDaoImpl implements IsolationTaskDao {
+
+    @Autowired
+    private IsolationTaskMapper isolationTaskMapper;
+
+    @Override
+    public IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
+                                                       String taskName,
+                                                       int pageNumber,
+                                                       int pageSize) {
+        Page<IsolationTask> page = new Page<>(pageNumber, pageSize);
+        return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page);
+    }
+
+    @Override
+    public List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes) {
+        if (CollectionUtils.isEmpty(taskCodes)) {
+            return Collections.emptyList();
+        }
+        return isolationTaskMapper.queryByTaskCodes(workflowInstanceId, taskCodes);
+    }
+
+    @Override
+    public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId,
+                                                         IsolationTaskStatus isolationTaskStatus) {
+        return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode());
+    }
+
+    @Override
+    public Optional<IsolationTask> queryById(long isolationTaskId) {
+        return Optional.ofNullable(isolationTaskMapper.selectById(isolationTaskId));
+    }
+
+    @Override
+    public List<IsolationTask> queryByIds(List<Long> isolationTaskIds) {
+        return isolationTaskMapper.selectBatchIds(isolationTaskIds);
+    }
+
+    @Override
+    public List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) {
+        return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode());
+    }
+
+    @Override
+    public int deleteByIdAndStatus(long id, IsolationTaskStatus status) {
+        return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode());
+    }
+
+    @Override
+    public void insert(IsolationTask isolationTask) {
+        isolationTaskMapper.insert(isolationTask);
+    }
+
+    @Override
+    public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) {
+        isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode());
+    }
+
+    @Override
+    public void batchInsert(List<IsolationTask> isolationTasks) {
+        if (CollectionUtils.isEmpty(isolationTasks)) {
+            return;
+        }
+        isolationTaskMapper.batchInsert(isolationTasks);
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..1c77aea33f
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+@Repository
+public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
+
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
+    @Override
+    public Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
+                                                                       @NonNull Integer processDefinitionVersion) {
+        return Optional.ofNullable(
+                processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode,
+                        processDefinitionVersion));
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
new file mode 100644
index 0000000000..9cdf6c7de6
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao {
+
+    @Autowired
+    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+    @Override
+    public List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(
+                                                                              long workflowDefinitionCode,
+                                                                              int workflowDefinitionVersion) {
+        return processTaskRelationLogMapper.queryByProcessCodeAndVersion(workflowDefinitionCode,
+                workflowDefinitionVersion);
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..205fc3b649
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
@@ -0,0 +1,44 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Repository
+public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Override
+    public List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations) {
+        List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+        if (CollectionUtils.isEmpty(processTaskRelations)) {
+            return taskDefinitionLogs;
+        }
+        Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+            if (processTaskRelation.getPreTaskCode() > 0) {
+                taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
+            }
+            if (processTaskRelation.getPostTaskCode() > 0) {
+                taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
+            }
+        }
+
+        taskCodeVersionMap.forEach((code, version) -> {
+            taskDefinitionLogs.add(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version));
+        });
+        return taskDefinitionLogs;
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 510b5d7a71..0512980b10 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository.impl;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.exception.RepositoryException;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@@ -28,6 +29,11 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
         return taskInstanceMapper.selectBatchIds(taskInstanceIds);
     }
 
+    @Override
+    public List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) {
+        return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES);
+    }
+
     @Override
     public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException {
         try {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index 3f25bb0f49..3ab01171e1 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -17,7 +17,9 @@
 
 package org.apache.dolphinscheduler.dao.utils;
 
+import com.google.common.collect.Sets;
 import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -30,9 +32,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.commons.collections.CollectionUtils;
-
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -41,12 +45,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
 /**
  * dag tools
  */
@@ -591,4 +589,89 @@ public class DagHelper {
         }
         return false;
     }
+
+    /**
+     * Get all post nodes of the given node.
+     * <p>
+     * e.g. the given DAG is A -> B -> C, and taskNode is A, return result is A,B,C
+     */
+    public static Set<String> getAllPostNodes(@NonNull String taskNode,
+                                              @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        Set<String> result = Sets.newHashSet();
+        Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
+        Set<String> visitedNodes = new HashSet<>();
+        while (!currentLoopNodes.isEmpty()) {
+            Set<String> tmp = new HashSet<>();
+            for (String currentNode : currentLoopNodes) {
+                result.add(currentNode);
+                Set<String> subsequentNodes = workflowDAG.getSubsequentNodes(currentNode);
+                visitedNodes.add(currentNode);
+                subsequentNodes.forEach(s -> {
+                    if (!visitedNodes.contains(s)) {
+                        tmp.add(s);
+                    }
+                });
+            }
+            currentLoopNodes = tmp;
+        }
+        return result;
+    }
+
+    /**
+     * Get all pre nodes of the given node.
+     * <p>
+     * e.g. the given DAG is A -> B -> C, and taskNode is B, return result is A,B
+     */
+    public static Set<String> getAllPreNodes(@NonNull String taskNode,
+                                             @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        Set<String> result = Sets.newHashSet();
+        Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
+        Set<String> visitedNodes = new HashSet<>();
+        while (CollectionUtils.isNotEmpty(currentLoopNodes)) {
+            Set<String> tmp = new HashSet<>();
+            for (String currentNode : currentLoopNodes) {
+                result.add(currentNode);
+                visitedNodes.add(currentNode);
+                workflowDAG.getPreviousNodes(currentNode).forEach(s -> {
+                    if (!visitedNodes.contains(s)) {
+                        tmp.add(s);
+                    }
+                });
+            }
+            currentLoopNodes = tmp;
+        }
+        return result;
+    }
+
+    /**
+     * Judge if the givenTaskNode is the child of any given parentNodes or is belongs to parentNodes.
+     */
+    public static boolean isChildOfAnyParentNodes(@NonNull String givenTaskNode,
+                                                  @NonNull Set<String> parentNodes,
+                                                  @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+        if (CollectionUtils.isEmpty(parentNodes)) {
+            return false;
+        }
+
+        if (parentNodes.contains(givenTaskNode)) {
+            return true;
+        }
+
+        Set<String> postNodesOfParent = new HashSet<>();
+        Set<String> visited = new HashSet<>();
+        while (CollectionUtils.isNotEmpty(parentNodes)) {
+            Set<String> tmp = new HashSet<>();
+            for (String parentNode : parentNodes) {
+                postNodesOfParent.add(parentNode);
+                visited.add(parentNode);
+                workflowDAG.getSubsequentNodes(parentNode).forEach(node -> {
+                    if (!visited.contains(node)) {
+                        tmp.add(node);
+                    }
+                });
+            }
+            parentNodes = tmp;
+        }
+        return postNodesOfParent.contains(givenTaskNode);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
new file mode 100644
index 0000000000..4a7100010e
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper">
+    <sql id="baseSql">
+        id
+        ,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time
+    </sql>
+
+    <select id="pageQuery" resultType="string">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where 1 = 1
+        <if test="workflowInstanceName != null">
+            and workflow_instance_name = #{workflowInstanceName}
+        </if>
+        <if test="taskName != null">
+            and task_name = #{taskName}
+        </if>
+    </select>
+
+    <select id="queryByTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where workflow_instance_id = #{workflowInstanceId}
+        and task_code in
+        <foreach collection="taskCodes" item="taskCode" open="(" separator="," close=")">
+            #{taskCode}
+        </foreach>
+    </select>
+
+    <select id="queryByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where workflow_instance_id = #{workflowInstanceId} and status = #{status}
+    </select>
+
+    <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_isolation_task
+        where  status = #{status}
+    </select>
+
+    <update id="updateIsolationTaskStatus">
+        update t_ds_isolation_task
+        set status = #{status}
+        where id = #{id}
+    </update>
+
+    <insert id="batchInsert">
+        insert into t_ds_isolation_task (
+        workflow_instance_id,
+        workflow_instance_name,
+        task_name,
+        task_code,
+        status
+        )
+        values
+        <foreach collection="isolationTasks" item="isolationTask" separator=",">
+            (
+            #{isolationTask.workflowInstanceId},
+            #{isolationTask.workflowInstanceName},
+            #{isolationTask.taskName},
+            #{isolationTask.taskCode},
+            #{isolationTask.status}
+            )
+        </foreach>
+    </insert>
+
+    <delete id="deleteByIdAndStatus">
+        delete
+        from t_ds_isolation_task
+        where id = #{id}
+          and status = #{status}
+    </delete>
+
+</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 2422318c10..8f6df6738c 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -1977,3 +1977,20 @@ CREATE TABLE t_ds_fav
     user_id   int         NOT NULL,
     PRIMARY KEY (id)
 );
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+    `id`                     bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`   bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
+    `status`                 tinyint(4) DEFAULT '0',
+    `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`)
+);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 6df6a9b0fe..e5b69db827 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -1896,22 +1896,22 @@ CREATE TABLE `t_ds_k8s` (
 -- Table structure for t_ds_k8s_namespace
 -- ----------------------------
 DROP TABLE IF EXISTS `t_ds_k8s_namespace`;
-CREATE TABLE `t_ds_k8s_namespace` (
-  `id` int(11) NOT NULL AUTO_INCREMENT,
-  `limits_memory` int(11) DEFAULT NULL,
-  `namespace` varchar(100) DEFAULT NULL,
-  `online_job_num` int(11) DEFAULT NULL,
-  `owner` varchar(100) DEFAULT NULL,
-  `pod_replicas` int(11) DEFAULT NULL,
-  `pod_request_cpu` decimal(14,3) DEFAULT NULL,
-  `pod_request_memory` int(11) DEFAULT NULL,
-  `tag` varchar(100) DEFAULT NULL,
-  `limits_cpu` decimal(14,3) DEFAULT NULL,
-  `k8s` varchar(100) DEFAULT NULL,
-  `create_time` datetime DEFAULT NULL COMMENT 'create time',
-  `update_time` datetime DEFAULT NULL COMMENT 'update time',
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
+CREATE TABLE `t_ds_k8s_namespace`(
+                                     `id`                 int(11) NOT NULL AUTO_INCREMENT,
+                                     `limits_memory`      int(11) DEFAULT NULL,
+                                     `namespace`          varchar(100)   DEFAULT NULL,
+                                     `online_job_num`     int(11) DEFAULT NULL,
+                                     `owner`              varchar(100)   DEFAULT NULL,
+                                     `pod_replicas`       int(11) DEFAULT NULL,
+                                     `pod_request_cpu`    decimal(14, 3) DEFAULT NULL,
+                                     `pod_request_memory` int(11) DEFAULT NULL,
+                                     `tag`                varchar(100)   DEFAULT NULL,
+                                     `limits_cpu`         decimal(14, 3) DEFAULT NULL,
+                                     `k8s`                varchar(100)   DEFAULT NULL,
+                                     `create_time`        datetime       DEFAULT NULL COMMENT 'create time',
+                                     `update_time`        datetime       DEFAULT NULL COMMENT 'update time',
+                                     PRIMARY KEY (`id`),
+                                     UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
 ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
 
 
@@ -1919,21 +1919,22 @@ CREATE TABLE `t_ds_k8s_namespace` (
 -- Table structure for t_ds_alert_send_status
 -- ----------------------------
 DROP TABLE IF EXISTS t_ds_alert_send_status;
-CREATE TABLE t_ds_alert_send_status(
-   `id`                       int(11) NOT NULL AUTO_INCREMENT,
-   `alert_id`                 int(11) NOT NULL,
-   `alert_plugin_instance_id` int(11) NOT NULL,
-   `send_status`              tinyint(4) DEFAULT '0',
-   `log`                      text,
-   `create_time`              datetime DEFAULT NULL COMMENT 'create time',
-   PRIMARY KEY (`id`),
-   UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
-) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
+DROP TABLE IF EXISTS `t_ds_fav_task`;
 
 -- ----------------------------
 -- Table structure for t_ds_fav_task
 -- ----------------------------
-DROP TABLE IF EXISTS `t_ds_fav_task`;
+CREATE TABLE t_ds_alert_send_status
+(
+    `id`                       int(11) NOT NULL AUTO_INCREMENT,
+    `alert_id`                 int(11) NOT NULL,
+    `alert_plugin_instance_id` int(11) NOT NULL,
+    `send_status`              tinyint(4) DEFAULT '0',
+    `log`                      text,
+    `create_time`              datetime DEFAULT NULL COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 CREATE TABLE `t_ds_fav_task`
 (
     `id`        bigint      NOT NULL AUTO_INCREMENT COMMENT 'favorite task id',
@@ -1943,3 +1944,21 @@ CREATE TABLE `t_ds_fav_task`
 ) ENGINE = InnoDB
   AUTO_INCREMENT = 1
   DEFAULT CHARSET = utf8;
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+    `id`                     bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`   bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
+    `status`                 tinyint(4) DEFAULT '0',
+    `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`),
+    KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE
+) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 15bf31236a..f9bb5587cc 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -1941,3 +1941,21 @@ CREATE TABLE t_ds_fav
     user_id   int         NOT NULL,
     PRIMARY KEY (id)
 );
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+    `id`                     bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`   bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
+    `status`                 tinyint(4) DEFAULT '0',
+    `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`),
+    CONSTRAINT               `workflow_instance_index` (`workflow_instance_id`)
+);
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index a80660b549..de1890f150 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.dao.utils;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Sets;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
+import org.junit.Assert;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,10 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
 
 /**
  * dag helper test
@@ -335,9 +334,8 @@ public class DagHelperTest {
      * 2->8->5->7
      *
      * @return dag
-     * @throws JsonProcessingException if error throws JsonProcessingException
      */
-    private DAG<String, TaskNode, TaskNodeRelation> generateDag() throws IOException {
+    private DAG<String, TaskNode, TaskNodeRelation> generateDag() {
         List<TaskNode> taskNodeList = new ArrayList<>();
         TaskNode node1 = new TaskNode();
         node1.setId("1");
@@ -529,4 +527,23 @@ public class DagHelperTest {
         Assert.assertNotNull(dag);
     }
 
+    @Test
+    public void getAllPostNodes() {
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+        Set<String> allPostNodes = DagHelper.getAllPostNodes("2", dag);
+        Assert.assertEquals(Sets.newHashSet("2", "3", "5", "6", "7", "8"), allPostNodes);
+    }
+
+    @Test
+    public void getAllPreNodes() {
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+        Set<String> allPostNodes = DagHelper.getAllPreNodes("2", dag);
+        Assert.assertEquals(Sets.newHashSet("1", "2"), allPostNodes);
+    }
+
+    @Test
+    public void isChildOfAnyParentNodes() {
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+        Assert.assertTrue(DagHelper.isChildOfAnyParentNodes("2", Sets.newHashSet("1"), dag));
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
new file mode 100644
index 0000000000..826a456fd8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
@@ -0,0 +1,26 @@
+package org.apache.dolphinscheduler.server.master.processor.isolation;
+
+import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+
+    @Autowired
+    private IsolationTaskManager isolationTaskManager;
+
+    @Override
+    public void process(Channel channel, Command command) {
+        if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
+            throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
+        }
+        isolationTaskManager.refreshIsolationTaskMapFromDB();
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index c2eb200059..eb4d82cd3d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.rpc;
 
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingData
 
 import javax.annotation.PostConstruct;
 
+import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -74,6 +76,9 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
 
+    @Autowired
+    private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor;
+
     public void start() {
         logger.info("Starting Master RPC Server...");
         // init remoting server
@@ -96,6 +101,9 @@ public class MasterRPCServer implements AutoCloseable {
         this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
+        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST,
+                refreshIsolationTaskProcessor);
+
         this.nettyRemotingServer.start();
         logger.info("Started Master RPC Server...");
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 5546b474d7..6446248c06 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
 import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
     @Autowired
     private MasterFailoverService masterFailoverService;
 
+    @Autowired
+    private IsolationTaskManager isolationTaskManager;
+
     protected FailoverExecuteThread() {
         super("FailoverExecuteThread");
     }
@@ -66,6 +70,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
                 // todo: DO we need to schedule a task to do this kind of check
                 // This kind of check may only need to be executed when a master server start
                 masterFailoverService.checkMasterFailover();
+                isolationTaskManager.refreshIsolationTaskMapFromDB();
             } catch (Exception e) {
                 logger.error("Master failover thread execute error", e);
             } finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 24982437a2..1d823a71a7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -108,6 +109,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
     @Autowired
     private StateEventCallbackService stateEventCallbackService;
 
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
     private String masterAddress;
 
     protected MasterSchedulerBootstrap() {
@@ -184,7 +188,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                                 processAlertManager,
                                 masterConfig,
                                 stateWheelExecuteThread,
-                                curingGlobalParamsService);
+                                curingGlobalParamsService,
+                                isolationTaskDao);
                         processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
                         workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
                                 processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index d05bdf8015..e102238178 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -37,8 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.Environment;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -47,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -188,6 +191,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final Set<String> stateCleanTaskCodes = new HashSet<>();
 
+    // isolationTaskCode -> isolatedTimes
+    private final Map<Long, Integer> isolationTaskCodesToTimesMap = new HashMap<>();
+
     /**
      * state event queue
      */
@@ -208,6 +214,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final CuringParamsService curingParamsService;
 
+    private final IsolationTaskDao isolationTaskDao;
+
     private final String masterAddress;
 
     /**
@@ -227,7 +235,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                                    @NonNull ProcessAlertManager processAlertManager,
                                    @NonNull MasterConfig masterConfig,
                                    @NonNull StateWheelExecuteThread stateWheelExecuteThread,
-                                   @NonNull CuringParamsService curingParamsService) {
+                                   @NonNull CuringParamsService curingParamsService,
+                                   @NonNull IsolationTaskDao isolationTaskDao) {
         this.processService = processService;
         this.processInstanceDao = processInstanceDao;
         this.processInstance = processInstance;
@@ -235,6 +244,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
         this.curingParamsService = curingParamsService;
+        this.isolationTaskDao = isolationTaskDao;
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
         TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
     }
@@ -435,6 +445,84 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
+    public void onlineTaskIsolation(long taskCode) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            // find the post running task
+            Set<String> needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+            // if the current task is finished, kill the post task
+            // we need to submit an online isolation task event, otherwise there may exist concurrent problem
+            for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) {
+                Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr);
+                isolationTaskCodesToTimesMap.put(isolationTaskCode,
+                        isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1);
+                ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
+                if (iTaskProcessor == null) {
+                    logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
+                            isolationTaskCode);
+                    continue;
+                }
+                iTaskProcessor.action(TaskAction.ISOLATE);
+                StateEvent stateEvent = new StateEvent();
+                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+                stateEvent.setProcessInstanceId(this.processInstance.getId());
+                stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
+                stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
+                this.addStateEvent(stateEvent);
+            }
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
+        }
+    }
+
+    public void cancelTaskIsolation(long taskCode) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            // todo:
+            // restart the killed/ paused task
+            Set<String> needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+            for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) {
+                Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr);
+                Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode);
+                if (isolateTimes == null) {
+                    logger.warn(
+                            "The current task has not been isolated, so it don't need to offline isolation, taskCode: {}",
+                            isolationTaskCode);
+                    continue;
+                }
+                if (isolateTimes == 1) {
+                    isolationTaskCodesToTimesMap.remove(isolationTaskCode);
+                    ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
+                    if (iTaskProcessor == null) {
+                        // the current task has not been submitted
+                        continue;
+                    }
+                    // the current task has no pre isolation, restart
+                    TaskInstance taskInstance = iTaskProcessor.taskInstance();
+                    if (taskInstance.getState() == ExecutionStatus.PAUSE_BY_ISOLATION) {
+                        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                        addTaskToStandByList(taskInstance);
+                        logger.info(
+                                "Cancel isolation task, the current task state is pause_by_isolation, change to submitted_success and add it back to standbyList");
+                        continue;
+                    }
+                    if (taskInstance.getState() == ExecutionStatus.KILL_BY_ISOLATION) {
+                        addTaskToStandByList(cloneCancelIsolationTaskInstance(taskInstance));
+                        logger.info(
+                                "Cancel isolation task, the current task state is kill_by_isolation, change to submitted_success and add it back to standbyList");
+                        continue;
+                    }
+                } else {
+                    isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
+                }
+
+            }
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
+        }
+
+    }
+
     /**
      * crate new task instance to retry, different objects from the original
      *
@@ -798,6 +886,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
         // generate process dag
         dag = DagHelper.buildDagGraph(processDag);
+        List<IsolationTask> isolationTasks =
+                isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE);
+        for (IsolationTask isolationTask : isolationTasks) {
+            Set<String> allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag);
+            allPostNodes.forEach(postNode -> {
+                isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(),
+                        isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1);
+            });
+        }
         logger.info("Build dag success, dag: {}", dag);
     }
 
@@ -933,6 +1030,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
         try {
+            if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION);
+                logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}",
+                        taskInstance.getTaskCode());
+            }
+
             // package task instance before submit
             processService.packageTaskInstance(taskInstance, processInstance);
 
@@ -1117,6 +1220,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return newTaskInstance;
     }
 
+    private @Nullable TaskInstance cloneCancelIsolationTaskInstance(TaskInstance taskInstance) {
+        TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
+        if (taskNode == null) {
+            logger.error("clone retry task instance error, taskNode is null, code:{}", taskInstance.getTaskCode());
+            return null;
+        }
+        TaskInstance newTaskInstance = TaskInstanceUtils.cloneTaskInstance(processInstance, taskNode, taskInstance);
+        TaskInstanceUtils.injectEnvironment(taskInstance, getTaskInstanceEnvironment(newTaskInstance));
+        return newTaskInstance;
+    }
+
     private @Nullable Environment getTaskInstanceEnvironment(@NonNull TaskInstance taskInstance) {
         Environment environment = null;
         if (!taskInstance.getEnvironmentCode().equals(-1L)) {
@@ -1468,17 +1582,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
         // success
         if (state == ExecutionStatus.RUNNING_EXECUTION) {
-            List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
             if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
                 // tasks currently pending submission, no retries, indicating that depend is waiting to complete
                 return ExecutionStatus.RUNNING_EXECUTION;
-            } else if (CollectionUtils.isNotEmpty(killTasks)) {
+            }
+            if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL))) {
                 // tasks maybe killed manually
                 return ExecutionStatus.FAILURE;
-            } else {
-                // if the waiting queue is empty and the status is in progress, then success
-                return ExecutionStatus.SUCCESS;
             }
+            if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL_BY_ISOLATION))
+                    || CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.PAUSE_BY_ISOLATION))) {
+                // No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION
+                return ExecutionStatus.PAUSE_BY_ISOLATION;
+            }
+            // if the waiting queue is empty and the status is in progress, then success
+            return ExecutionStatus.SUCCESS;
         }
 
         return state;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index ad3d42a3a5..e86a93ffc1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
@@ -111,6 +112,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected boolean timeout = false;
 
+    protected boolean isolated = false;
+
     protected TaskInstance taskInstance = null;
 
     protected ProcessInstance processInstance;
@@ -160,6 +163,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      */
     protected abstract boolean killTask();
 
+    protected abstract boolean isolateTask();
+
     protected abstract boolean taskTimeout();
 
     protected abstract boolean runTask();
@@ -193,6 +198,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
                 case DISPATCH:
                     result = dispatch();
                     break;
+                case ISOLATE:
+                    result = isolate();
+                    break;
                 default:
                     logger.error("unknown task action: {}", taskAction);
             }
@@ -256,6 +264,14 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         return paused;
     }
 
+    private boolean isolate() {
+        if (isolated) {
+            return true;
+        }
+        isolated = isolateTask();
+        return isolated;
+    }
+
     protected boolean stop() {
         if (killed) {
             return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 5fcad28858..3285623186 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -21,7 +21,6 @@ import com.google.auto.service.AutoService;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.BlockingOpportunity;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -30,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -89,6 +87,15 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        logger.info("Blocking task has been isolated");
+        return true;
+    }
+
     @Override
     protected boolean taskTimeout() {
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 7b45be6373..e1ef11eef3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
+import com.google.auto.service.AutoService;
 import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -34,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 
 import java.util.Date;
 
-import com.google.auto.service.AutoService;
-
 /**
  * common task processor
  */
@@ -130,6 +129,27 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        try {
+            if (taskInstance.getState().typeIsFinished()) {
+                return true;
+            }
+            taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+            taskInstance.setEndTime(new Date());
+            processService.updateTaskInstance(taskInstance);
+            if (StringUtils.isNotEmpty(taskInstance.getHost())) {
+                killRemoteTask();
+            }
+            logger.info("Master isolate taskInstance success, taskName: {} taskInstanceId: {}", taskInstance.getName(),
+                    taskInstance.getId());
+            return true;
+        } catch (Exception e) {
+            logger.error("Master isolate task error, taskInstance id: {}", taskInstance.getId(), e);
+            return false;
+        }
+    }
+
     @Override
     protected boolean taskTimeout() {
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 7d3669c443..5c76496aa3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.server.master.runner.task;
 
 import com.google.auto.service.AutoService;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -27,7 +26,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -106,6 +104,15 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        logger.info("Condition task has been isolated");
+        return true;
+    }
+
     @Override
     public String getType() {
         return TASK_TYPE_CONDITIONS;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 6dc5cc6b7f..d5fe056820 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -202,6 +202,14 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        return true;
+    }
+
     /**
      * judge all dependent tasks finish
      *
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 754eba7d50..a681366ea5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -201,6 +201,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        ProcessInstance subProcessInstance =
+                processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
+        if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
+            return true;
+        }
+        // todo: isolate sub process
+        return true;
+    }
+
     private void sendToSubProcess() {
         StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                 processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index e3ba4c4db0..8ed3c42ce8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -85,6 +85,14 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean isolateTask() {
+        taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+        taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        return true;
+    }
+
     @Override
     protected boolean taskTimeout() {
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 9044945258..2c60d364bc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -22,6 +22,7 @@ package org.apache.dolphinscheduler.server.master.runner.task;
  */
 public enum TaskAction {
     PAUSE,
+    ISOLATE,
     STOP,
     TIMEOUT,
     SUBMIT,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
new file mode 100644
index 0000000000..2621766d84
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
@@ -0,0 +1,97 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class IsolationTaskManager {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    private volatile Map<Long, IsolationTask> onlineIsolationTasksInMemory = new HashMap<>();
+
+    private volatile Set<Long> currentIsolationTaskIdsInMemory = new HashSet<>();
+
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @PostConstruct
+    public void init() {
+        refreshIsolationTaskMapFromDB();
+    }
+
+    public void refreshIsolationTaskMapFromDB() {
+        Map<Long, IsolationTask> totalOnlineIsolationTasksInDB =
+                isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE)
+                        .stream()
+                        .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
+        Set<Long> totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet();
+
+        Collection<IsolationTask> needToOfflineIsolationTasks =
+                CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB)
+                        .stream()
+                        .map(onlineIsolationTasksInMemory::get)
+                        .collect(Collectors.toList());
+
+        Collection<IsolationTask> needToOnlineIsolationTasks =
+                CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory)
+                        .stream()
+                        .map(totalOnlineIsolationTasksInDB::get)
+                        .collect(Collectors.toList());
+
+        currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB;
+        onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB;
+
+        offlineIsolationTask(needToOfflineIsolationTasks);
+        onlineIsolationTask(needToOnlineIsolationTasks);
+    }
+
+    private void offlineIsolationTask(Collection<IsolationTask> needOfflineIsolationTasks) {
+        if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) {
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode());
+            log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId());
+        }
+    }
+
+    private void onlineIsolationTask(Collection<IsolationTask> needOnlineIsolationTasks) {
+        if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) {
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode());
+            log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId());
+        }
+    }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
index 5d86abc8e4..e4bdf3511c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
@@ -166,7 +166,6 @@ public class TaskInstanceUtils {
         taskInstance.setTaskDefine(oldTaskInstance.getTaskDefine());
         taskInstance.setProcessDefine(oldTaskInstance.getProcessDefine());
         taskInstance.setProcessInstance(processInstance);
-        taskInstance.setState(oldTaskInstance.getState());
         return taskInstance;
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
index e1088bf83d..00ae91019d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
@@ -36,6 +36,8 @@ public class WorkflowInstanceUtils {
                 return getStartTaskInstanceIdsFromRecoverParam(processInstance);
             case START_FROM_STATE_CLEAN_TASKS:
                 return getStartTaskInstanceIdsFromStateCleanParam(processInstance);
+            case RECOVERY_FROM_ISOLATION_TASKS:
+                return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance);
             default:
                 return Collections.emptyList();
         }
@@ -69,6 +71,22 @@ public class WorkflowInstanceUtils {
         return stateCleanTaskInstanceIds;
     }
 
+    public static List<Integer> getStartTaskInstanceIdsFromRecoverIsolationParam(@NonNull ProcessInstance processInstance) {
+        Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+        List<Integer> recoveryPausedIsolationIds =
+                JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+                        new TypeReference<ArrayList<Integer>>() {
+                        });
+        List<Integer> recoveryKilledIsolationIds =
+                JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+                        new TypeReference<ArrayList<Integer>>() {
+                        });
+        List<Integer> result = new ArrayList<>();
+        result.addAll(recoveryPausedIsolationIds);
+        result.addAll(recoveryKilledIsolationIds);
+        return result;
+    }
+
     public static List<String> getStartNodeName(@NonNull ProcessInstance processInstance) {
         List<String> startNodeNameList = new ArrayList<>();
         Map<String, String> paramMap = JSONUtils.toMap(processInstance.getCommandParam());
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 6f55e1d5b5..856d38cb2f 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -92,6 +93,8 @@ public class WorkflowExecuteRunnableTest {
 
     private CuringParamsService curingGlobalParamsService;
 
+    private IsolationTaskDao isolationTaskDao;
+
     @Before
     public void init() throws Exception {
         applicationContext = mock(ApplicationContext.class);
@@ -124,10 +127,11 @@ public class WorkflowExecuteRunnableTest {
         curingGlobalParamsService = mock(CuringParamsService.class);
         NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
         ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
+        isolationTaskDao = mock(IsolationTaskDao.class);
         workflowExecuteThread =
                 PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao,
                         nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread,
-                        curingGlobalParamsService));
+                        curingGlobalParamsService, isolationTaskDao));
         // prepareProcess init dag
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 37f0b97f22..daf92071b5 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -119,5 +119,9 @@ public enum CommandType {
     /**
      * workflow executing data response, from master to api
      */
-    WORKFLOW_EXECUTING_DATA_RESPONSE;
+    WORKFLOW_EXECUTING_DATA_RESPONSE,
+
+    REFRESH_ISOLATION_REQUEST,
+    ;
+
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
new file mode 100644
index 0000000000..9b097843c4
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.remote.command.isolation;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.io.Serializable;
+
+public class RefreshIsolationTaskRequest implements Serializable {
+
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 009b872681..c1fe78e78a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1047,6 +1047,27 @@ public class ProcessServiceImpl implements ProcessService {
                 processInstance.setRunTimes(runTime + 1);
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 break;
+            case RECOVERY_FROM_ISOLATION_TASKS:
+                Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+                List<Integer> recoveryPausedIsolationIds =
+                        JSONUtils.parseObject(
+                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+                                new TypeReference<ArrayList<Integer>>() {
+                                });
+                if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) {
+                    recoveryPausedIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
+                }
+                List<Integer> recoveryKilledIsolationIds =
+                        JSONUtils.parseObject(
+                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+                                new TypeReference<ArrayList<Integer>>() {
+                                });
+                if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) {
+                    recoveryKilledIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
+                }
+                processInstance.setRunTimes(runTime + 1);
+                processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                break;
             default:
                 break;
         }
@@ -1564,9 +1585,6 @@ public class ProcessServiceImpl implements ProcessService {
                     taskInstance.getTaskCode());
             return null;
         }
-        if (processInstanceState == ExecutionStatus.READY_PAUSE) {
-            taskInstance.setState(ExecutionStatus.PAUSE);
-        }
         taskInstance.setExecutorId(processInstance.getExecutorId());
         taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
         if (taskInstance.getSubmitTime() == null) {
@@ -1603,7 +1621,8 @@ public class ProcessServiceImpl implements ProcessService {
         if (state == ExecutionStatus.RUNNING_EXECUTION
                 || state == ExecutionStatus.DELAY_EXECUTION
                 || state == ExecutionStatus.KILL
-                || state == ExecutionStatus.DISPATCH) {
+                || state == ExecutionStatus.DISPATCH
+                || state == ExecutionStatus.PAUSE_BY_ISOLATION) {
             return state;
         }
         // return pasue /stop if process instance state is ready pause / stop
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index da728b2fdb..a83184e16b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -17,10 +17,10 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.enums;
 
-import java.util.HashMap;
-
 import com.baomidou.mybatisplus.annotation.EnumValue;
 
+import java.util.HashMap;
+
 /**
  * running status for workflow and task nodes
  */
@@ -65,6 +65,8 @@ public enum ExecutionStatus {
     READY_BLOCK(15, "ready block"),
     BLOCK(16, "block"),
     DISPATCH(17, "dispatch"),
+    PAUSE_BY_ISOLATION(18, "paused by isolation"),
+    KILL_BY_ISOLATION(19, "killed by isolation"),
     ;
 
     ExecutionStatus(int code, String descp) {
@@ -117,10 +119,14 @@ public enum ExecutionStatus {
      * @return status
      */
     public boolean typeIsFinished() {
-        return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
+        return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation()
                 || typeIsStop() || typeIsBlock();
     }
 
+    public boolean typeIsReady() {
+        return this == READY_PAUSE || this == READY_STOP || this == READY_BLOCK;
+    }
+
     /**
      * status is waiting thread
      *
@@ -139,6 +145,18 @@ public enum ExecutionStatus {
         return this == PAUSE;
     }
 
+    public boolean typeIsPauseByIsolation() {
+        return this == PAUSE_BY_ISOLATION;
+    }
+
+    public boolean typeIsKilledByIsolation() {
+        return this == KILL_BY_ISOLATION;
+    }
+
+    public boolean typeIsIsolated() {
+        return this == PAUSE_BY_ISOLATION || this == KILL_BY_ISOLATION;
+    }
+
     /**
      * status is pause
      *
@@ -172,7 +190,7 @@ public enum ExecutionStatus {
      * @return status
      */
     public boolean typeIsCancel() {
-        return this == KILL || this == STOP;
+        return this == KILL || this == STOP || this == KILL_BY_ISOLATION;
     }
 
     public int getCode() {