You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/11/01 01:30:27 UTC

[dolphinscheduler] 02/05: Support coronation task (#182)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 350f1b3825c3ac3dda0a4b79d571ee6b42a606ba
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Oct 28 14:54:17 2022 +0800

    Support coronation task (#182)
    
    * Implement isolate task api interface
    
    * Add coronation task
---
 .../api/checker/CoronationTaskChecker.java         |  38 +++
 .../api/checker/CoronationTaskCheckerImpl.java     |  99 ++++++++
 .../api/checker/IsolationTaskChecker.java          |   9 -
 .../api/checker/IsolationTaskCheckerImpl.java      |  55 -----
 .../constants/ApiFuncIdentificationConstant.java   |   7 +-
 .../api/controller/CoronationTaskController.java   |  91 +++++++
 .../api/controller/IsolationTaskController.java    |  39 +--
 .../dto/request/CoronationTaskListingRequest.java  |  24 ++
 .../dto/request/CoronationTaskParseRequest.java    |  18 ++
 .../dto/request/CoronationTaskSubmitRequest.java   |  16 ++
 .../apache/dolphinscheduler/api/enums/Status.java  |  13 +
 .../api/remote/ApiServerRPCClient.java             |   5 +
 .../api/service/CoronationTaskService.java         |  30 +++
 .../api/service/IsolationTaskService.java          |  14 +-
 .../service/impl/CoronationTaskServiceImpl.java    | 205 ++++++++++++++++
 .../api/service/impl/IsolationTaskServiceImpl.java |  98 +++-----
 .../api/service/impl/TaskInstanceServiceImpl.java  |   3 +-
 .../api/vo/CoronationTaskExcelImportVO.java        |  27 ++
 .../api/vo/CoronationTaskParseVO.java              |  34 +++
 .../api/vo/IsolationTaskListingVO.java             |  12 +-
 .../apache/dolphinscheduler/common/Constants.java  |   3 +
 .../dolphinscheduler/common/enums/CommandType.java |   2 +
 .../dolphinscheduler/common/utils/JSONUtils.java   |   1 +
 .../dao/dto/CoronationTaskDTO.java                 |  50 ++++
 .../dao/dto/IsolationTaskStatus.java               |  27 --
 .../dao/dto/TaskRuntimeContext.java                |  20 ++
 .../dao/dto/TaskSimpleInfoDTO.java                 |  15 ++
 .../{IsolationTask.java => CoronationTask.java}    |   6 +-
 .../dolphinscheduler/dao/entity/IsolationTask.java |   2 -
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  10 +
 .../dolphinscheduler/dao/mapper/CommandMapper.java |   3 +
 .../dao/mapper/CoronationTaskMapper.java           |  24 ++
 .../dao/mapper/IsolationTaskMapper.java            |  18 +-
 .../dao/mapper/ProcessInstanceMapper.java          |   2 +
 .../dao/mapper/TaskInstanceMapper.java             |   4 +
 .../dao/repository/CommandDao.java                 |   2 +
 .../dao/repository/CoronationTaskDao.java          |  28 +++
 .../dao/repository/IsolationTaskDao.java           |  14 +-
 .../dao/repository/ProcessDefinitionDao.java       |   4 +-
 .../dao/repository/ProcessInstanceDao.java         |   5 +-
 .../dao/repository/TaskInstanceDao.java            |   4 +
 .../dao/repository/impl/CommandDaoImpl.java        |   7 +
 .../dao/repository/impl/CoronationTaskDaoImpl.java |  65 +++++
 .../dao/repository/impl/IsolationTaskDaoImpl.java  |  32 +--
 .../repository/impl/ProcessDefinitionDaoImpl.java  |   6 +-
 .../repository/impl/ProcessInstanceDaoImpl.java    |  11 +-
 .../dao/repository/impl/TaskInstanceDaoImpl.java   |   8 +
 .../dao/mapper/CoronationTaskMapper.xml            |  69 ++++++
 .../dao/mapper/IsolationTaskMapper.xml             |  32 +--
 .../dao/mapper/TaskInstanceMapper.xml              |   2 +-
 .../src/main/resources/sql/dolphinscheduler_h2.sql |  19 +-
 .../main/resources/sql/dolphinscheduler_mysql.sql  |  25 +-
 .../RefreshCoronationMetadataProcessor.java}       |  14 +-
 ...java => RefreshIsolationMetadataProcessor.java} |  10 +-
 .../server/master/rpc/MasterRPCClient.java         |   7 +-
 .../server/master/rpc/MasterRPCServer.java         |  14 +-
 .../master/runner/FailoverExecuteThread.java       |  11 +-
 .../master/runner/MasterSchedulerBootstrap.java    |  11 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 208 ++++++++++++----
 .../master/service/CoronationMetadataManager.java  | 273 +++++++++++++++++++++
 .../master/service/IsolationMetadataManager.java   | 136 ++++++++++
 .../master/service/IsolationTaskManager.java       |  97 --------
 .../transformer/MasterCommandTransformer.java      |  14 ++
 .../transformer/MasterCommandTransformerImpl.java  |  45 ++++
 .../server/master/utils/WorkflowInstanceUtils.java |  18 ++
 .../master/runner/WorkflowExecuteRunnableTest.java |  46 ++--
 .../remote/command/CommandType.java                |   4 +-
 .../RefreshCoronationMetadataRequest.java}         |   7 +-
 ...t.java => RefreshIsolationMetadataRequest.java} |   8 +-
 .../service/process/ProcessServiceImpl.java        |  17 +-
 .../plugin/task/api/enums/ExecutionStatus.java     |   8 +-
 71 files changed, 1815 insertions(+), 490 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java
new file mode 100644
index 0000000000..b44feb9556
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java
@@ -0,0 +1,38 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface CoronationTaskChecker {
+
+    void checkCanParseCoronationTask(@NonNull User loginUser,
+                                     @NonNull Project project,
+                                     ProcessInstance processInstance,
+                                     @NonNull CoronationTaskExcelImportVO vo) throws ServiceException;
+
+    void checkCanSubmitTaskCoronation(@NonNull User loginUser,
+                                      @NonNull Project project,
+                                      @NonNull ProcessInstance processInstance,
+                                      @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG,
+                                      @NonNull List<CoronationTaskParseVO> voList);
+
+    void checkCanListingTaskCoronation(@NonNull User loginUser,
+                                       long projectCode);
+
+    void checkCanCancelTaskCoronation(@NonNull User loginUser,
+                                      long projectCode,
+                                      @NonNull ProcessInstance processInstance,
+                                      @NonNull CoronationTask coronationTask);
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java
new file mode 100644
index 0000000000..23a6200adb
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java
@@ -0,0 +1,99 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_CANCEL;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_PARSE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_VIEW;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+
+import java.util.List;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class CoronationTaskCheckerImpl implements CoronationTaskChecker {
+
+    @Autowired
+    private ProcessDefinitionDao processDefinitionDao;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Override
+    public void checkCanParseCoronationTask(@NonNull User loginUser,
+                                            @NonNull Project project,
+                                            @NonNull ProcessInstance processInstance,
+                                            @NonNull CoronationTaskExcelImportVO vo) throws ServiceException {
+        ProcessDefinition processDefinition =
+                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+        if (processDefinition.getProjectCode() != project.getCode()) {
+            throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CORONATION_TASK_PARSE);
+    }
+
+    @Override
+    public void checkCanSubmitTaskCoronation(@NonNull User loginUser,
+                                             @NonNull Project project,
+                                             @NonNull ProcessInstance processInstance,
+                                             @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG,
+                                             @NonNull List<CoronationTaskParseVO> voList) throws ServiceException {
+        ProcessDefinition processDefinition =
+                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+
+        if (processDefinition.getProjectCode() != project.getCode()) {
+            throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CORONATION_TASK_PARSE);
+        // todo: check if the vos can submit coronation tasks
+        // if there already exist coronaton task in db or the upstream task contains coronation task, we cannot submit
+
+    }
+
+    @Override
+    public void checkCanListingTaskCoronation(@NonNull User loginUser,
+                                              long projectCode) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, CORONATION_TASK_VIEW);
+    }
+
+    @Override
+    public void checkCanCancelTaskCoronation(@NonNull User loginUser, long projectCode,
+                                             @NonNull ProcessInstance processInstance,
+                                             @NonNull CoronationTask coronationTask) {
+        ProcessDefinition processDefinition =
+                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+
+        if (processDefinition.getProjectCode() != projectCode) {
+            throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, CORONATION_TASK_CANCEL);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
index 9ee19f10be..919464290c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
@@ -15,18 +15,9 @@ public interface IsolationTaskChecker {
                                      ProcessInstance processInstance,
                                      @NonNull List<IsolationTaskExcelParseVO> voList);
 
-    void checkCanOnlineTaskIsolation(@NonNull User loginUser,
-                                     long projectCode,
-                                     @NonNull ProcessInstance processInstance,
-                                     @NonNull IsolationTask isolationTask);
-
     void checkCanListingTaskIsolation(@NonNull User loginUser,
                                       long projectCode);
 
-    void checkCanDeleteTaskIsolation(@NonNull User loginUser,
-                                     long projectCode,
-                                     long isolationId);
-
     void checkCanCancelTaskIsolation(@NonNull User loginUser,
                                      long projectCode,
                                      ProcessInstance processInstance,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
index 99402f7fb6..d5d724db85 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
@@ -9,7 +9,6 @@ import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -26,14 +25,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST;
@@ -79,22 +72,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG);
     }
 
-    @Override
-    public void checkCanOnlineTaskIsolation(@NonNull User loginUser,
-                                            long projectCode,
-                                            ProcessInstance processInstance,
-                                            @NonNull IsolationTask isolationTask) {
-        Project project = projectService.queryByCode(loginUser, projectCode);
-        checkOnlineIsolationTaskAuth(loginUser, project, processInstance);
-
-        if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) {
-            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE);
-        }
-
-        String workflowInstanceName = isolationTask.getWorkflowInstanceName();
-        checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance);
-    }
-
     @Override
     public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) {
         Project project = projectService.queryByCode(loginUser, projectCode);
@@ -102,20 +79,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST);
     }
 
-    @Override
-    public void checkCanDeleteTaskIsolation(@NonNull User loginUser,
-                                            long projectCode,
-                                            long isolationId) {
-        Project project = projectService.queryByCode(loginUser, projectCode);
-        projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE);
-
-        IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
-                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
-        if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) {
-            throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE);
-        }
-    }
-
     @Override
     public void checkCanCancelTaskIsolation(@NonNull User loginUser,
                                             long projectCode,
@@ -127,9 +90,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         Project project = projectService.queryByCode(loginUser, projectCode);
         checkCancelIsolationTaskAuth(loginUser, project, processInstance);
 
-        if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) {
-            throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL);
-        }
     }
 
     private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName,
@@ -173,21 +133,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT);
     }
 
-    private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
-        if (processInstance == null) {
-            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
-        }
-        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
-                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
-                        processInstance.getProcessDefinitionVersion())
-                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
-        if (processDefinitionLog.getProjectCode() != project.getCode()) {
-            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
-        }
-
-        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE);
-    }
-
     private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
         if (processInstance == null) {
             throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
index c8cdd2cb58..6bc0eb475a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.api.constants;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -97,10 +98,12 @@ public class ApiFuncIdentificationConstant {
     public static final String FORCED_SUCCESS = "project:task-instance:force-success";
     public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state";
     public static final String ISOLATION_TASK_LIST = "project:isolation-task:list";
-    public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online";
     public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit";
     public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel";
-    public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete";
+
+    public static final String CORONATION_TASK_VIEW = "project:coronation-task:view";
+    public static final String CORONATION_TASK_PARSE = "project:coronation-task:parse";
+    public static final String CORONATION_TASK_CANCEL = "project:coronation-task:cancel";
     public static final String VIEW_LOG = "project:log:detail";
     public static final String DOWNLOAD_LOG = "project:log:download-log";
     public static final String PROJECT_OVERVIEW = "project:overview:view";
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
new file mode 100644
index 0000000000..80ee7a1107
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
@@ -0,0 +1,91 @@
+package org.apache.dolphinscheduler.api.controller;
+
+import io.swagger.annotations.Api;
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.CoronationTaskService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
+
+import java.util.List;
+
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_CANCEL_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_DELETE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_LISTING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_ONLINE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_PARSE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_SUBMIT_ERROR;
+
+@Api(tags = "ISOLATION_TASK_TAG")
+@RestController
+@RequestMapping("/projects/{projectCode}/coronation-task")
+public class CoronationTaskController {
+
+    @Autowired
+    private CoronationTaskService coronationTaskService;
+
+    @PostMapping(value = "/parse")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_PARSE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<List<CoronationTaskParseVO>> parseCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                   @PathVariable("projectCode") long projectCode,
+                                                                   @RequestBody CoronationTaskParseRequest request) {
+        return Result.success(coronationTaskService.parseCoronationTask(loginUser, projectCode, request));
+    }
+
+    @PostMapping(value = "/submit")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> submitCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                             @PathVariable("projectCode") long projectCode,
+                                             @RequestBody CoronationTaskSubmitRequest request) {
+        coronationTaskService.submitCoronationTask(loginUser, projectCode, request);
+        return Result.success(null);
+    }
+
+    @PutMapping(value = "/cancel/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_CANCEL_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> cancelCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                             @PathVariable("projectCode") long projectCode,
+                                             @PathVariable("id") long id) {
+        coronationTaskService.cancelCoronationTask(loginUser, projectCode, id);
+        return Result.success(null);
+    }
+
+    @GetMapping("")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_LISTING_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<PageInfo<CoronationTaskDTO>> listingCoronationTasks(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                      @PathVariable("projectCode") long projectCode,
+                                                                      @RequestBody CoronationTaskListingRequest request) {
+        return Result.success(coronationTaskService.listingCoronationTasks(loginUser, projectCode, request));
+
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
index f4b445456d..ef9c404113 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
@@ -8,12 +8,12 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.IsolationTaskService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -21,13 +21,11 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestAttribute;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 import springfox.documentation.annotations.ApiIgnore;
 
-import javax.validation.Valid;
-
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR;
 
@@ -50,17 +48,6 @@ public class IsolationTaskController {
         return Result.success(null);
     }
 
-    @PutMapping(value = "/online/{id}")
-    @ResponseStatus(HttpStatus.OK)
-    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
-    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result<Void> onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                            @PathVariable long projectCode,
-                                            @PathVariable(name = "id") long isolationId) {
-        isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId);
-        return Result.success(null);
-    }
-
     @PutMapping(value = "/cancel/{id}")
     @ResponseStatus(HttpStatus.OK)
     @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
@@ -76,21 +63,15 @@ public class IsolationTaskController {
     @ResponseStatus(HttpStatus.OK)
     @ApiException(ISOLATION_TASK_LISTING_ERROR)
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result<PageInfo<IsolationTask>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                                                @PathVariable long projectCode,
-                                                                @RequestBody @Valid IsolationTaskListingRequest request) {
+    public Result<PageInfo<IsolationTaskListingVO>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                         @PathVariable long projectCode,
+                                                                         @RequestParam(required = false) String workflowInstanceName,
+                                                                         @RequestParam(required = false) String taskName,
+                                                                         @RequestParam Integer pageNo,
+                                                                         @RequestParam Integer pageSize) {
+        IsolationTaskListingRequest request =
+                new IsolationTaskListingRequest(workflowInstanceName, taskName, pageNo, pageSize);
         return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request));
     }
 
-    @DeleteMapping("/{id}")
-    @ResponseStatus(HttpStatus.OK)
-    @ApiException(ISOLATION_TASK_DELETE_ERROR)
-    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result<Void> deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                            @PathVariable long projectCode,
-                                            @PathVariable long id) {
-        isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id);
-        return Result.success(null);
-    }
-
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
new file mode 100644
index 0000000000..7ecded2c1c
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskListingRequest {
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    @NotNull(message = "pageNo cannot be null")
+    private Integer pageNo;
+
+    @NotNull(message = "pageSize cannot be null")
+    private Integer pageSize;
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java
new file mode 100644
index 0000000000..4fa9c384de
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskParseRequest {
+
+    private List<CoronationTaskExcelImportVO> coronationTasks;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
new file mode 100644
index 0000000000..2597a1206e
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskSubmitRequest {
+
+    private List<CoronationTaskParseVO> CoronationTasks;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 602cb477dd..71e462bbf9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -550,6 +550,19 @@ public enum Status {
     ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"),
     ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline",
             "删除隔离任务异常,该隔离任务尚未下线"),
+
+    CORONATION_TASK_PARSE_ERROR(1600000, "Coronation task parse error", "解析加冕任务异常"),
+    CORONATION_TASK_NOT_EXIST(1600001, "Coronation task not exist", "加冕任务不存在"),
+    CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED(1600002,
+            "Coronation task parse error, taskNode name is not validated", "解析加冕任务异常, 任务名不正确"),
+    CORONATION_TASK_SUBMIT_ERROR(1600100, "Coronation task submit error", "提交加冕任务异常"),
+    CORONATION_TASK_ONLINE_ERROR(1600200, "Coronation task online error", "上线加冕任务异常"),
+    CORONATION_TASK_CANCEL_ERROR(1600300, "Coronation task cancel error", "取消加冕任务异常"),
+    CORONATION_TASK_LISTING_ERROR(1600400, "Coronation task listing error", "查询加冕任务异常"),
+    CORONATION_TASK_DELETE_ERROR(1600500, "Coronation task delete error", "删除加冕任务异常"),
+
+    PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1700000,
+            "The workflow instance doesn't belong to the current project", "工作流实例不属于当前项目"),
             ;
 
     private final int code;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
index ef3634cd52..a0d82e4632 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
@@ -27,6 +27,11 @@ public class ApiServerRPCClient {
         sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS);
     }
 
+    public void send(@NonNull Host host,
+                     @NonNull Command rpcCommand) throws RemotingException {
+        client.send(host, rpcCommand);
+    }
+
     public void sendSyncCommand(@NonNull Host host,
                                 @NonNull Command rpcCommand,
                                 long timeoutMills) throws RemotingException, InterruptedException {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java
new file mode 100644
index 0000000000..a45a843d4a
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java
@@ -0,0 +1,30 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface CoronationTaskService {
+
+    List<CoronationTaskParseVO> parseCoronationTask(@NonNull User loginUser,
+                                                    long projectCode,
+                                                    @NonNull CoronationTaskParseRequest request);
+
+    void submitCoronationTask(@NonNull User loginUser,
+                              long projectCode,
+                              @NonNull CoronationTaskSubmitRequest request);
+
+    PageInfo<CoronationTaskDTO> listingCoronationTasks(@NonNull User loginUser,
+                                                       long projectCode,
+                                                       @NonNull CoronationTaskListingRequest request);
+
+    void cancelCoronationTask(@NonNull User loginUser, long projectCode, long id);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
index 7225d19007..16cd6ec225 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
@@ -5,6 +5,7 @@ import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest;
 import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
 import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.User;
 
@@ -14,19 +15,12 @@ public interface IsolationTaskService {
                               long projectCode,
                               @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest);
 
-    void onlineTaskIsolation(@NonNull User loginUser,
-                             long projectCode,
-                             long isolationTaskId);
-
     void cancelTaskIsolation(@NonNull User loginUser,
                              long projectCode,
                              long isolationId);
 
-    PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
-                                                 long projectCode,
-                                                 @NonNull IsolationTaskListingRequest request);
+    PageInfo<IsolationTaskListingVO> listingTaskIsolation(@NonNull User loginUser,
+                                                          long projectCode,
+                                                          @NonNull IsolationTaskListingRequest request);
 
-    void deleteTaskIsolation(@NonNull User loginUser,
-                             long projectCode,
-                             long id);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
new file mode 100644
index 0000000000..a6c0691c8b
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
@@ -0,0 +1,205 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.api.checker.CoronationTaskChecker;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient;
+import org.apache.dolphinscheduler.api.service.CoronationTaskService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.dto.TaskSimpleInfoDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class CoronationTaskServiceImpl implements CoronationTaskService {
+
+    @Autowired
+    private CoronationTaskChecker coronationTaskChecker;
+
+    @Autowired
+    private CoronationTaskDao coronationTaskDao;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private WorkflowDAGService workflowDAGService;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private ApiServerRPCClient apiServerRPCClient;
+
+    @Override
+    public List<CoronationTaskParseVO> parseCoronationTask(@NonNull User loginUser,
+                                                           long projectCode,
+                                                           @NonNull CoronationTaskParseRequest request) {
+        List<CoronationTaskExcelImportVO> coronationTasks = request.getCoronationTasks();
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        return coronationTasks.stream()
+                .map(vo -> {
+                    ProcessInstance workflowInstance =
+                            processInstanceDao.queryProcessInstanceById(vo.getWorkflowInstanceId())
+                                    .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+                    coronationTaskChecker.checkCanParseCoronationTask(loginUser, project, workflowInstance, vo);
+                    DAG<String, TaskNode, TaskNodeRelation> workflowDAG =
+                            workflowDAGService.getWorkflowDAG(workflowInstance.getProcessDefinitionCode(),
+                                    workflowInstance.getProcessDefinitionVersion());
+                    if (!vo.getTaskName().equals(workflowDAG.getNode(Long.toString(vo.getTaskCode())).getName())) {
+                        throw new ServiceException(Status.CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED);
+                    }
+                    List<TaskSimpleInfoDTO> previousTaskNodeDTO =
+                            workflowDAG.getPreviousNodes(Long.toString(vo.getTaskCode()))
+                                    .stream()
+                                    .map(previousNodeCode -> {
+                                        TaskNode node = workflowDAG.getNode(previousNodeCode);
+                                        return new TaskSimpleInfoDTO(node.getName(), node.getCode());
+                                    }).collect(Collectors.toList());
+                    return CoronationTaskParseVO.builder()
+                            .workflowInstanceId(workflowInstance.getId())
+                            .workflowInstanceName(workflowInstance.getName())
+                            .taskNode(vo.getTaskName())
+                            .taskCode(vo.getTaskCode())
+                            .upstreamTasks(previousTaskNodeDTO)
+                            .build();
+                }).collect(Collectors.toList());
+    }
+
+    @Override
+    public void submitCoronationTask(@NonNull User loginUser, long projectCode,
+                                     @NonNull CoronationTaskSubmitRequest request) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        Map<Integer, List<CoronationTaskParseVO>> coronationTaskMap = request.getCoronationTasks().stream()
+                .collect(Collectors.groupingBy(CoronationTaskParseVO::getWorkflowInstanceId));
+
+        List<CoronationTask> needToInsertIntoDB = new ArrayList<>();
+
+        for (Map.Entry<Integer, List<CoronationTaskParseVO>> entry : coronationTaskMap.entrySet()) {
+            Integer workflowInstanceId = entry.getKey();
+            List<CoronationTaskParseVO> vos = entry.getValue();
+
+            ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                    .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+            DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(
+                    workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion());
+            coronationTaskChecker.checkCanSubmitTaskCoronation(loginUser, project, workflowInstance, workflowDAG, vos);
+
+            List<CoronationTask> coronationTasks = vos.stream()
+                    .map(vo -> {
+                        Set<String> previousNodes = workflowDAG.getPreviousNodes(vo.getTaskCode().toString());
+                        Set<String> selectNodes = vo.getUpstreamTasks()
+                                .stream()
+                                .map(taskNode -> Long.toString(taskNode.getTaskCode()))
+                                .collect(Collectors.toSet());
+                        // The upstream node hasn't been selected will be set to forbidden execute
+                        List<TaskSimpleInfoDTO> needToForbiddenTaskCodes =
+                                CollectionUtils.subtract(previousNodes, selectNodes)
+                                        .stream()
+                                        .map(taskNode -> {
+                                            TaskNode node = workflowDAG.getNode(taskNode);
+                                            return new TaskSimpleInfoDTO(node.getName(), node.getCode());
+                                        }).collect(Collectors.toList());
+
+                        return CoronationTask.builder()
+                                .workflowInstanceId(vo.getWorkflowInstanceId())
+                                .workflowInstanceName(vo.getWorkflowInstanceName())
+                                .taskName(vo.getTaskNode())
+                                .taskCode(vo.getTaskCode())
+                                .forbiddenUpstreamTasks(JSONUtils.toJsonString(needToForbiddenTaskCodes))
+                                .build();
+                    }).collect(Collectors.toList());
+            needToInsertIntoDB.addAll(coronationTasks);
+        }
+        coronationTaskDao.batchInsert(needToInsertIntoDB);
+        sendSyncCoronationTasksRequestToMaster();
+    }
+
+    @Override
+    public void cancelCoronationTask(@NonNull User loginUser, long projectCode, long id) {
+        // do checker
+        CoronationTask coronationTask = coronationTaskDao.queryCoronationTaskById(id)
+                .orElseThrow(() -> new ServiceException(Status.CORONATION_TASK_NOT_EXIST));
+        ProcessInstance workflowInstance =
+                processInstanceDao.queryProcessInstanceById(coronationTask.getWorkflowInstanceId())
+                        .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+        coronationTaskChecker.checkCanCancelTaskCoronation(loginUser, projectCode, workflowInstance, coronationTask);
+        coronationTaskDao.deleteById(id);
+        sendSyncCoronationTasksRequestToMaster();
+    }
+
+    @Override
+    public PageInfo<CoronationTaskDTO> listingCoronationTasks(@NonNull User loginUser, long projectCode,
+                                                              @NonNull CoronationTaskListingRequest request) {
+        coronationTaskChecker.checkCanListingTaskCoronation(loginUser, projectCode);
+
+        Integer pageNo = request.getPageNo();
+        Integer pageSize = request.getPageSize();
+
+        IPage<CoronationTask> iPage = coronationTaskDao.pageQueryCoronationTask(
+                request.getWorkflowInstanceName(),
+                request.getTaskName(),
+                pageNo,
+                pageSize);
+
+        PageInfo<CoronationTaskDTO> pageInfo = new PageInfo<>(pageNo, pageSize);
+        pageInfo.setTotal((int) iPage.getTotal());
+        pageInfo.setTotalList(iPage.getRecords().stream().map(CoronationTaskDTO::new).collect(Collectors.toList()));
+
+        return pageInfo;
+    }
+
+    private void sendSyncCoronationTasksRequestToMaster() {
+        List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+        if (CollectionUtils.isEmpty(masters)) {
+            return;
+        }
+        Command command = new RefreshCoronationMetadataRequest().convert2Command();
+        for (Server master : masters) {
+            try {
+                apiServerRPCClient.send(new Host(master.getHost(), master.getPort()), command);
+            } catch (RemotingException e) {
+                log.error("Send RefreshCoronationTask request to master error, master: {}", master, e);
+            }
+        }
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
index 4f6ff908b1..f3bdcf7600 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
@@ -16,12 +16,12 @@ import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
 import org.apache.dolphinscheduler.api.transformer.CommandTransformer;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationMetadataRequest;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -49,7 +49,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR;
 
 @Slf4j
 @Service
@@ -83,7 +82,6 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
     private TaskInstanceDao taskInstanceDao;
 
     @Override
-    @Transactional
     public void submitTaskIsolations(@NonNull User loginUser,
                                      long projectCode,
                                      @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
@@ -100,7 +98,8 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
         for (Map.Entry<Integer, List<IsolationTaskExcelParseVO>> entry : workflow2VoMap.entrySet()) {
             Integer workflowInstanceId = entry.getKey();
             List<IsolationTaskExcelParseVO> vos = entry.getValue();
-            ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+            ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                    .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
             isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos);
 
             List<IsolationTask> isolationTasks = entry.getValue().stream().map(vo -> {
@@ -109,36 +108,13 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                         .workflowInstanceName(vo.getWorkflowInstanceName())
                         .taskName(vo.getTaskName())
                         .taskCode(vo.getTaskCode())
-                        .status(IsolationTaskStatus.ONLINE.getCode())
                         .build();
             }).collect(Collectors.toList());
             needToInsertIntoDB.addAll(isolationTasks);
             needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks));
         }
         isolationTaskDao.batchInsert(needToInsertIntoDB);
-        // we split here to avoid rollback RPC request
-        try {
-            refreshIsolationTasks();
-        } catch (Exception ex) {
-            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR);
-        }
-    }
-
-    @Override
-    @Transactional
-    public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) {
-        IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId)
-                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
-        ProcessInstance processInstance =
-                processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId());
-
-        isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
-        isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE);
-        try {
-            refreshIsolationTasks();
-        } catch (Exception ex) {
-            throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR);
-        }
+        sendIsolationTaskRefreshRequestToMaster();
     }
 
     @Override
@@ -150,21 +126,18 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                 .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
 
         Integer workflowInstanceId = isolationTask.getWorkflowInstanceId();
-        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
         isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
-        isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE);
-        insertRecoveryCommandIfNeeded(processInstance, isolationTask);
-        try {
-            refreshIsolationTasks();
-        } catch (RemotingException | InterruptedException e) {
-            throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR);
-        }
+        isolationTaskDao.deleteById(isolationTask.getId());
+        insertRecoveryCommandIfNeeded(processInstance);
+        sendIsolationTaskRefreshRequestToMaster();
     }
 
     @Override
-    public PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
-                                                        long projectCode,
-                                                        @NonNull IsolationTaskListingRequest request) {
+    public PageInfo<IsolationTaskListingVO> listingTaskIsolation(@NonNull User loginUser,
+                                                                 long projectCode,
+                                                                 @NonNull IsolationTaskListingRequest request) {
         isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode);
 
         Integer pageNo = request.getPageNo();
@@ -175,46 +148,48 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                 request.getTaskName(),
                 pageNo,
                 pageSize);
-
-        PageInfo<IsolationTask> pageInfo = new PageInfo<>(pageNo, pageSize);
+        List<IsolationTaskListingVO> isolationTaskListingVOList = iPage.getRecords()
+                .stream()
+                .map(isolationTask -> {
+                    return IsolationTaskListingVO.builder()
+                            .id(isolationTask.getId())
+                            .workflowInstanceId(isolationTask.getWorkflowInstanceId())
+                            .workflowInstanceName(isolationTask.getWorkflowInstanceName())
+                            // todo: set task status
+                            .taskName(isolationTask.getTaskName())
+                            .taskCode(isolationTask.getTaskCode())
+                            .createTime(isolationTask.getCreateTime())
+                            .updateTime(isolationTask.getUpdateTime())
+                            .build();
+                }).collect(Collectors.toList());
+        PageInfo<IsolationTaskListingVO> pageInfo = new PageInfo<>(pageNo, pageSize);
         pageInfo.setTotal((int) iPage.getTotal());
-        pageInfo.setTotalList(iPage.getRecords());
+        pageInfo.setTotalList(isolationTaskListingVOList);
         return pageInfo;
     }
 
-    @Override
-    public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) {
-        isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id);
-        int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE);
-        if (deleteNum <= 0) {
-            throw new ServiceException(ISOLATION_TASK_NOT_EXIST);
-        }
-    }
-
-    private void refreshIsolationTasks() throws RemotingException, InterruptedException {
+    private void sendIsolationTaskRefreshRequestToMaster() {
         List<Server> masters = registryClient.getServerList(NodeType.MASTER);
         if (CollectionUtils.isEmpty(masters)) {
             return;
         }
 
         org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest =
-                new RefreshIsolationTaskRequest().convert2Command();
+                new RefreshIsolationMetadataRequest().convert2Command();
         for (Server master : masters) {
             try {
-                apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
-                        refreshIsolationRequest);
-            } catch (RemotingException | InterruptedException e) {
+                apiServerRPCClient.send(new Host(master.getHost(), master.getPort()), refreshIsolationRequest);
+            } catch (RemotingException e) {
                 log.error("Send RefreshIsolationTask request to master error, master: {}", master, e);
-                throw e;
             }
         }
     }
 
-    private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance,
-                                               @NonNull IsolationTask isolationTask) {
+    private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance) {
         if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) {
             return;
         }
+        log.info("The current workflow instance is in PAUSE_BY_ISOLATION status, will insert a recovery command");
         int workflowInstanceId = processInstance.getId();
         // find the isolationTaskInstanceIds need to recovery
         // find the sub node is in pause or kill
@@ -225,7 +200,7 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
         List<TaskInstance> taskInstances =
                 taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId);
         Set<String> onlineIsolationTaskCodes =
-                isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE)
+                isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId)
                         .stream()
                         .map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode()))
                         .collect(Collectors.toSet());
@@ -236,6 +211,7 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                         onlineIsolationTaskCodes, workflowDAG))
                 .collect(Collectors.toList());
         if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) {
+            log.error("The current workflow instance has no task instance can recovery");
             return;
         }
         // find if this taskInstance still exist pre isolationTasks
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 9d1dc166a2..6c5b49b94d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -242,7 +242,8 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
                     .entrySet()) {
                 Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey();
                 List<Integer> needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue();
-                ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+                ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                        .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
                 workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance);
                 Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance,
                         needToCleanStateTaskInstanceIds);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java
new file mode 100644
index 0000000000..dc21207513
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskExcelImportVO {
+
+    @NotNull(message = "Workflow instance id cannot be null")
+    private Integer workflowInstanceId;
+
+    @NotNull(message = "Workflow instance name cannot be null")
+    private String workflowInstanceName;
+
+    @NotNull(message = "Task code cannot be null")
+    private Long taskCode;
+
+    @NotNull(message = "Task name cannot be null")
+    private String taskName;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java
new file mode 100644
index 0000000000..e52379b2cd
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java
@@ -0,0 +1,34 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.dao.dto.TaskSimpleInfoDTO;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskParseVO {
+
+    @NotNull(message = "workflow instance cannot be null")
+    private String workflowInstanceName;
+
+    @NotNull(message = "workflow instance id cannot be null")
+    private Integer workflowInstanceId;
+
+    @NotNull(message = "taskNode cannot be null")
+    private String taskNode;
+
+    @NotNull(message = "taskCode cannot be null")
+    private Long taskCode;
+
+    @NotEmpty(message = "upstream task nodes cannot be null")
+    private List<TaskSimpleInfoDTO> upstreamTasks;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
similarity index 54%
copy from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
index 8a852aace5..94b69cc9fb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
@@ -1,8 +1,5 @@
-package org.apache.dolphinscheduler.dao.entity;
+package org.apache.dolphinscheduler.api.vo;
 
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -14,10 +11,8 @@ import java.util.Date;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@TableName("t_ds_isolation_task")
-public class IsolationTask {
+public class IsolationTaskListingVO {
 
-    @TableId(value = "id", type = IdType.AUTO)
     private Long id;
 
     private int workflowInstanceId;
@@ -28,10 +23,9 @@ public class IsolationTask {
 
     private long taskCode;
 
-    private int status;
+    private String taskStatus;
 
     private Date createTime;
 
     private Date updateTime;
-
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index efffbc3842..a851b6dc5d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -341,6 +341,9 @@ public final class Constants {
     public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds";
     public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds";
 
+    public static final String CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS =
+            "RecoveryPausedByCoronationTaskInstanceIds";
+
     public static final String CMD_PARAM_START_PARAMS = "StartParams";
 
     public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 62d538a58b..ca30317808 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -57,6 +57,8 @@ public enum CommandType {
 
     RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"),
 
+    RECOVERY_FROM_CORONATION_PAUSE_TASKS(14, "recovery from coronation pause tasks"),
+
     ;
 
     CommandType(int code, String descp) {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index d31c92780c..255c4f833f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -72,6 +72,7 @@ public class JSONUtils {
             .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
             .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
             .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
             .setTimeZone(TimeZone.getDefault())
             .setDateFormat(new SimpleDateFormat(Constants.YYYY_MM_DD_HH_MM_SS));
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java
new file mode 100644
index 0000000000..df22a2fbcf
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java
@@ -0,0 +1,50 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+
+import java.util.Date;
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskDTO {
+
+    private Long id;
+
+    private int workflowInstanceId;
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    private long taskCode;
+
+    private List<TaskSimpleInfoDTO> forbiddenUpstreamTasks;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+    public CoronationTaskDTO(@NonNull CoronationTask coronationTask) {
+        this.id = coronationTask.getId();
+        this.workflowInstanceId = coronationTask.getWorkflowInstanceId();
+        this.workflowInstanceName = coronationTask.getWorkflowInstanceName();
+        this.taskName = coronationTask.getTaskName();
+        this.taskCode = coronationTask.getTaskCode();
+        this.forbiddenUpstreamTasks = JSONUtils.parseObject(coronationTask.getForbiddenUpstreamTasks(),
+                new TypeReference<List<TaskSimpleInfoDTO>>() {
+                });
+        this.createTime = coronationTask.getCreateTime();
+        this.updateTime = coronationTask.getUpdateTime();
+    }
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
deleted file mode 100644
index dcc93cc64b..0000000000
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.dolphinscheduler.dao.dto;
-
-public enum IsolationTaskStatus {
-
-    ONLINE(0),
-    OFFLINE(1),
-    ;
-
-    private final int code;
-
-    IsolationTaskStatus(int code) {
-        this.code = code;
-    }
-
-    public int getCode() {
-        return code;
-    }
-
-    public static IsolationTaskStatus of(int code) {
-        for (IsolationTaskStatus value : IsolationTaskStatus.values()) {
-            if (value.getCode() == code) {
-                return value;
-            }
-        }
-        throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code));
-    }
-}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java
new file mode 100644
index 0000000000..ea454ce543
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java
@@ -0,0 +1,20 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class TaskRuntimeContext {
+
+    private boolean hasBeenIsolated;
+    private boolean isIsolationTask;
+
+    private boolean hasBeenCoronatted;
+    private boolean isCoronationTask;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
new file mode 100644
index 0000000000..9fe05ca9ca
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskSimpleInfoDTO {
+
+    private String taskName;
+    private long taskCode;
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
similarity index 86%
copy from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
copy to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
index 8a852aace5..a7b72203bb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
@@ -14,8 +14,8 @@ import java.util.Date;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@TableName("t_ds_isolation_task")
-public class IsolationTask {
+@TableName("t_ds_coronation_task")
+public class CoronationTask {
 
     @TableId(value = "id", type = IdType.AUTO)
     private Long id;
@@ -28,7 +28,7 @@ public class IsolationTask {
 
     private long taskCode;
 
-    private int status;
+    private String forbiddenUpstreamTasks;
 
     private Date createTime;
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
index 8a852aace5..a7aaf4b359 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
@@ -28,8 +28,6 @@ public class IsolationTask {
 
     private long taskCode;
 
-    private int status;
-
     private Date createTime;
 
     private Date updateTime;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 3e1d8cef0f..23ad1de3f0 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -276,6 +276,8 @@ public class TaskInstance implements Serializable {
      */
     private int taskGroupId;
 
+    private String runtimeContext;
+
     public void init(String host, Date startTime, String executePath) {
         this.host = host;
         this.startTime = startTime;
@@ -758,4 +760,12 @@ public class TaskInstance implements Serializable {
     public void setTaskGroupPriority(int taskGroupPriority) {
         this.taskGroupPriority = taskGroupPriority;
     }
+
+    public String getRuntimeContext() {
+        return runtimeContext;
+    }
+
+    public void setRuntimeContext(String runtimeContext) {
+        this.runtimeContext = runtimeContext;
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index c957260a3c..fd1c8d7204 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -57,4 +57,7 @@ public interface CommandMapper extends BaseMapper<Command> {
                                          @Param("thisMasterSlot") int thisMasterSlot);
 
     void batchInsertCommand(List<Command> commands);
+
+    List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(@Param("workflowInstanceId") long workflowInstanceId,
+                                                                     @Param("command_type") int commandType);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
new file mode 100644
index 0000000000..d5829de216
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.dao.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface CoronationTaskMapper extends BaseMapper<CoronationTask> {
+
+    IPage<CoronationTask> pageQueryCoronationTask(Page<IsolationTask> page,
+                                                  @Param("workflowInstanceName") String workflowInstanceName,
+                                                  @Param("taskName") String taskName);
+
+    void batchInsert(@Param("coronationTasks") List<CoronationTask> coronationTasks);
+
+    List<CoronationTask> queryAllCoronationTasks();
+
+    int queryAllCoronationTaskNumber();
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
index babb00c5bb..3bd1045682 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -10,23 +10,17 @@ import java.util.List;
 
 public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
 
-    IPage<IsolationTask> pageQuery(@Param("workflowInstanceName") String workflowInstanceName,
-                                   @Param("taskName") String taskName,
-                                   IPage<IsolationTask> page);
+    IPage<IsolationTask> pageQuery(IPage<IsolationTask> page,
+                                   @Param("workflowInstanceName") String workflowInstanceName,
+                                   @Param("taskName") String taskName);
 
     List<IsolationTask> queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId,
                                          @NonNull @Param("taskCodes") List<Long> taskCodes);
 
-    void updateIsolationTaskStatus(@Param("id") long isolationTaskId,
-                                   @Param("status") int status);
-
-    int deleteByIdAndStatus(@Param("id") long isolationTaskId,
-                            @Param("status") int status);
-
-    List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId,
-                                                  @Param("status") int status);
+    List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
 
     void batchInsert(@Param("isolationTasks") List<IsolationTask> isolationTasks);
 
-    List<IsolationTask> queryByStatus(@Param("status") int code);
+    List<IsolationTask> queryAllIsolationTask();
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index a2ec345fa0..cf97f42712 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -258,4 +258,6 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
 
     ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode,
                                            @Param("state") int state, @Param("id") int id);
+
+    List<ProcessInstance> queryByStatus(@Param("state") int code);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index cf282e1359..23a47177e9 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -39,6 +39,10 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
     List<TaskInstance> findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId,
                                                     @Param("flag") Flag flag);
 
+    List<TaskInstance> findValidTaskListByProcessIdAndTaskStatus(@Param("processInstanceId") Integer processInstanceId,
+                                                                 @Param("status") int status,
+                                                                 @Param("flag") int flag);
+
     List<TaskInstance> queryByHostAndStatus(@Param("host") String host,
                                             @Param("states") int[] stateArray);
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
index 278b922082..17445b8e6f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
@@ -7,4 +7,6 @@ import java.util.List;
 public interface CommandDao {
 
     void batchInsertCommand(List<Command> commands);
+
+    List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
new file mode 100644
index 0000000000..1893e8ffd6
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
@@ -0,0 +1,28 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface CoronationTaskDao {
+
+    IPage<CoronationTask> pageQueryCoronationTask(String workflowInstanceName,
+                                                  String taskName,
+                                                  @NonNull Integer pageNo,
+                                                  @NonNull Integer pageSize);
+
+    List<CoronationTaskDTO> queryAllCoronationTasks();
+
+    Optional<CoronationTask> queryCoronationTaskById(long id);
+
+    int deleteById(long id);
+
+    void batchInsert(List<CoronationTask> coronationTasks);
+
+    int queryAllCoronationTaskNumber();
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
index de16a92be0..b38b30569a 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -2,12 +2,10 @@ package org.apache.dolphinscheduler.dao.repository;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import lombok.NonNull;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 
 public interface IsolationTaskDao {
 
@@ -19,21 +17,17 @@ public interface IsolationTaskDao {
                                                 int pageNumber,
                                                 int pageSize);
 
+    List<IsolationTask> queryAllIsolationTask();
+
     List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes);
 
-    List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus);
+    List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId);
 
     Optional<IsolationTask> queryById(long isolationTaskId);
 
-    List<IsolationTask> queryByIds(List<Long> isolationTaskIds);
-
-    List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus);
-
-    int deleteByIdAndStatus(long id, IsolationTaskStatus status);
+    int deleteById(long id);
 
     void insert(IsolationTask isolationTaskDTO);
 
-    void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus);
-
     void batchInsert(List<IsolationTask> isolationTasks);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
index bd6e140951..7cde663fa7 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
@@ -3,7 +3,9 @@ package org.apache.dolphinscheduler.dao.repository;
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 
+import java.util.Optional;
+
 public interface ProcessDefinitionDao {
 
-    ProcessDefinition queryProcessDefinitionByCode(@NonNull Long processDefinitionCode);
+    Optional<ProcessDefinition> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index e789101783..a069340c96 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -19,14 +19,16 @@ package org.apache.dolphinscheduler.dao.repository;
 
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 import java.util.List;
+import java.util.Optional;
 
 public interface ProcessInstanceDao {
 
     List<ProcessInstance> queryProcessInstanceByIds(List<Integer> processInstanceIds);
 
-    ProcessInstance queryProcessInstanceById(@NonNull Integer processInstanceId);
+    Optional<ProcessInstance> queryProcessInstanceById(@NonNull Integer processInstanceId);
 
     int insertProcessInstance(ProcessInstance processInstance);
 
@@ -39,4 +41,5 @@ public interface ProcessInstanceDao {
      */
     int upsertProcessInstance(ProcessInstance processInstance);
 
+    List<ProcessInstance> queryProcessInstanceByStatus(@NonNull ExecutionStatus pauseByIsolation);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index 3918ad8386..971258499d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository;
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.exception.RepositoryException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 import java.util.List;
 
@@ -12,6 +13,9 @@ public interface TaskInstanceDao {
 
     List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId);
 
+    List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(@NonNull Integer workflowInstanceId,
+                                                                               @NonNull ExecutionStatus status);
+
     /**
      * Update the taskInstance, if update failed will throw exception.
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
index 79297445d1..2365b5f2bb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
@@ -1,6 +1,7 @@
 package org.apache.dolphinscheduler.dao.repository.impl;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
 import org.apache.dolphinscheduler.dao.repository.CommandDao;
@@ -22,4 +23,10 @@ public class CommandDaoImpl implements CommandDao {
         }
         commandMapper.batchInsertCommand(commands);
     }
+
+    @Override
+    public List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId) {
+        return commandMapper.queryRecoveryCoronationCommandByWorkflowInstanceId(workflowInstanceId,
+                CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS.getCode());
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
new file mode 100644
index 0000000000..893b28fbbb
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
@@ -0,0 +1,65 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.mapper.CoronationTaskMapper;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Repository
+public class CoronationTaskDaoImpl implements CoronationTaskDao {
+
+    @Autowired
+    private CoronationTaskMapper coronationTaskMapper;
+
+    @Override
+    public IPage<CoronationTask> pageQueryCoronationTask(String workflowInstanceName, String taskName,
+                                                         @NonNull Integer pageNo,
+                                                         @NonNull Integer pageSize) {
+        Page<IsolationTask> page = new Page<>(pageNo, pageSize);
+        return coronationTaskMapper.pageQueryCoronationTask(page, workflowInstanceName, taskName);
+    }
+
+    @Override
+    public List<CoronationTaskDTO> queryAllCoronationTasks() {
+        return coronationTaskMapper.queryAllCoronationTasks()
+                .stream()
+                .map(CoronationTaskDTO::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<CoronationTask> queryCoronationTaskById(long id) {
+        return Optional.ofNullable(coronationTaskMapper.selectById(id));
+    }
+
+    @Override
+    public int deleteById(long id) {
+        return coronationTaskMapper.deleteById(id);
+    }
+
+    @Override
+    public void batchInsert(List<CoronationTask> coronationTasks) {
+        if (CollectionUtils.isEmpty(coronationTasks)) {
+            return;
+        }
+        coronationTaskMapper.batchInsert(coronationTasks);
+    }
+
+    @Override
+    public int queryAllCoronationTaskNumber() {
+        return coronationTaskMapper.queryAllCoronationTaskNumber();
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
index effab3aa16..0608adba0b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper;
 import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
@@ -29,7 +28,12 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
                                                        int pageNumber,
                                                        int pageSize) {
         Page<IsolationTask> page = new Page<>(pageNumber, pageSize);
-        return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page);
+        return isolationTaskMapper.pageQuery(page, workflowInstanceName, taskName);
+    }
+
+    @Override
+    public List<IsolationTask> queryAllIsolationTask() {
+        return isolationTaskMapper.queryAllIsolationTask();
     }
 
     @Override
@@ -41,9 +45,8 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
     }
 
     @Override
-    public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId,
-                                                         IsolationTaskStatus isolationTaskStatus) {
-        return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode());
+    public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId) {
+        return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId);
     }
 
     @Override
@@ -52,18 +55,8 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
     }
 
     @Override
-    public List<IsolationTask> queryByIds(List<Long> isolationTaskIds) {
-        return isolationTaskMapper.selectBatchIds(isolationTaskIds);
-    }
-
-    @Override
-    public List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) {
-        return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode());
-    }
-
-    @Override
-    public int deleteByIdAndStatus(long id, IsolationTaskStatus status) {
-        return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode());
+    public int deleteById(long id) {
+        return isolationTaskMapper.deleteById(id);
     }
 
     @Override
@@ -71,11 +64,6 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
         isolationTaskMapper.insert(isolationTask);
     }
 
-    @Override
-    public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) {
-        isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode());
-    }
-
     @Override
     public void batchInsert(List<IsolationTask> isolationTasks) {
         if (CollectionUtils.isEmpty(isolationTasks)) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
index 72dce0b2c3..637b10ad5d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
@@ -7,6 +7,8 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
+import java.util.Optional;
+
 @Repository
 public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
 
@@ -14,7 +16,7 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
     private ProcessDefinitionMapper processDefinitionMapper;
 
     @Override
-    public ProcessDefinition queryProcessDefinitionByCode(@NonNull Long processDefinitionCode) {
-        return processDefinitionMapper.queryByCode(processDefinitionCode);
+    public Optional<ProcessDefinition> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode) {
+        return Optional.ofNullable(processDefinitionMapper.queryByCode(processDefinitionCode));
     }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 84ab280c7f..338566e67e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -23,11 +23,13 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 @Slf4j
 @Repository
@@ -45,8 +47,8 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
     }
 
     @Override
-    public ProcessInstance queryProcessInstanceById(@NonNull Integer processInstanceId) {
-        return processInstanceMapper.selectById(processInstanceId);
+    public Optional<ProcessInstance> queryProcessInstanceById(@NonNull Integer processInstanceId) {
+        return Optional.ofNullable(processInstanceMapper.selectById(processInstanceId));
     }
 
     @Override
@@ -67,4 +69,9 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
             return insertProcessInstance(processInstance);
         }
     }
+
+    @Override
+    public List<ProcessInstance> queryProcessInstanceByStatus(@NonNull ExecutionStatus executionStatus) {
+        return processInstanceMapper.queryByStatus(executionStatus.getCode());
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 0512980b10..9b07088810 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -8,6 +8,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.exception.RepositoryException;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
@@ -34,6 +35,13 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
         return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES);
     }
 
+    @Override
+    public List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(@NonNull Integer workflowInstanceId,
+                                                                                      @NonNull ExecutionStatus status) {
+        return taskInstanceMapper.findValidTaskListByProcessIdAndTaskStatus(workflowInstanceId, status.getCode(),
+                Flag.YES.getCode());
+    }
+
     @Override
     public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException {
         try {
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
new file mode 100644
index 0000000000..0967fa71ec
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.CoronationTaskMapper">
+    <sql id="baseSql">
+        id
+        ,workflow_instance_id, workflow_instance_name, task_name, task_code, forbidden_upstream_tasks, create_time, update_time
+    </sql>
+
+    <select id="pageQueryCoronationTask" resultType="org.apache.dolphinscheduler.dao.entity.CoronationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_coronation_task
+        where 1 = 1
+        <if test="workflowInstanceName != null and workflowInstanceName != ''">
+            and workflow_instance_name = #{workflowInstanceName}
+        </if>
+        <if test="taskName != null and taskName != ''">
+            and task_name = #{taskName}
+        </if>
+    </select>
+
+    <select id="queryAllCoronationTasks" resultType="org.apache.dolphinscheduler.dao.entity.CoronationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_coronation_task
+    </select>
+
+    <select id="queryAllCoronationTaskNumber" resultType="java.lang.Integer">
+        select count(1) from t_ds_coronation_task
+    </select>
+
+    <insert id="batchInsert">
+        insert into t_ds_coronation_task (
+        workflow_instance_id,
+        workflow_instance_name,
+        task_name,
+        task_code,
+        forbidden_upstream_tasks
+        )
+        values
+        <foreach collection="coronationTasks" item="coronationTask" separator=",">
+            (
+            #{coronationTask.workflowInstanceId},
+            #{coronationTask.workflowInstanceName},
+            #{coronationTask.taskName},
+            #{coronationTask.taskCode},
+            #{coronationTask.forbiddenUpstreamTasks}
+            )
+        </foreach>
+    </insert>
+
+</mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
index 4a7100010e..390591be26 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -20,18 +20,18 @@
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper">
     <sql id="baseSql">
         id
-        ,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time
+        ,workflow_instance_id, workflow_instance_name, task_name, task_code, create_time, update_time
     </sql>
 
-    <select id="pageQuery" resultType="string">
+    <select id="pageQuery" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
         select
         <include refid="baseSql"/>
         from t_ds_isolation_task
         where 1 = 1
-        <if test="workflowInstanceName != null">
+        <if test="workflowInstanceName != null and workflowInstanceName != ''">
             and workflow_instance_name = #{workflowInstanceName}
         </if>
-        <if test="taskName != null">
+        <if test="taskName != null and taskName != ''">
             and task_name = #{taskName}
         </if>
     </select>
@@ -51,29 +51,21 @@
         select
         <include refid="baseSql"/>
         from t_ds_isolation_task
-        where workflow_instance_id = #{workflowInstanceId} and status = #{status}
+        where workflow_instance_id = #{workflowInstanceId}
     </select>
 
-    <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+    <select id="queryAllIsolationTask" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
         select
         <include refid="baseSql"/>
         from t_ds_isolation_task
-        where  status = #{status}
     </select>
 
-    <update id="updateIsolationTaskStatus">
-        update t_ds_isolation_task
-        set status = #{status}
-        where id = #{id}
-    </update>
-
     <insert id="batchInsert">
         insert into t_ds_isolation_task (
         workflow_instance_id,
         workflow_instance_name,
         task_name,
-        task_code,
-        status
+        task_code
         )
         values
         <foreach collection="isolationTasks" item="isolationTask" separator=",">
@@ -81,17 +73,9 @@
             #{isolationTask.workflowInstanceId},
             #{isolationTask.workflowInstanceName},
             #{isolationTask.taskName},
-            #{isolationTask.taskCode},
-            #{isolationTask.status}
+            #{isolationTask.taskCode}
             )
         </foreach>
     </insert>
 
-    <delete id="deleteByIdAndStatus">
-        delete
-        from t_ds_isolation_task
-        where id = #{id}
-          and status = #{status}
-    </delete>
-
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 336b966487..7c63a84e2c 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,7 +22,7 @@
         id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
         start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
         flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
-        first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id
+        first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, runtime_context
     </sql>
     <sql id="baseSqlV2">
         ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 8f6df6738c..134a260a45 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -864,6 +864,7 @@ CREATE TABLE t_ds_task_instance
     task_group_id           int(11) DEFAULT NULL,
     var_pool                longtext,
     dry_run                 int NULL DEFAULT 0,
+    runtime_context         text,
     PRIMARY KEY (id),
     FOREIGN KEY (process_instance_id) REFERENCES t_ds_process_instance (id) ON DELETE CASCADE
 );
@@ -1989,8 +1990,24 @@ CREATE TABLE `t_ds_isolation_task`
     `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
     `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
     `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
-    `status`                 tinyint(4) DEFAULT '0',
     `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
     `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
     PRIMARY KEY (`id`)
 );
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_coronation_task`;
+CREATE TABLE `t_ds_coronation_task`
+(
+    `id`                       bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`     bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name`   varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`                varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`                bigint       NOT NULL COMMENT 'isolation task code',
+    `forbidden_upstream_tasks` text COMMENT 'The task in forbidden_upstream_tasks will be in forbidden execute list',
+    `create_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`)
+);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index e5b69db827..d202084ec2 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -344,7 +344,8 @@ CREATE TABLE `t_ds_command` (
   `environment_code`          bigint(20) DEFAULT '-1' COMMENT 'environment code',
   `dry_run`                   tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
   PRIMARY KEY (`id`),
-  KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
+  KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE,
+  KEY `workflow_instance_id_index` (`process_instance_id`,`command_type`) USING BTREE,
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
 -- ----------------------------
@@ -859,6 +860,7 @@ CREATE TABLE `t_ds_task_instance` (
   `var_pool` longtext COMMENT 'var_pool',
   `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
   `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
+  `runtime_context` text COMMENT 'used to store some information used in runtime',
   PRIMARY KEY (`id`),
   KEY `process_instance_id` (`process_instance_id`) USING BTREE,
   KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
@@ -1956,9 +1958,26 @@ CREATE TABLE `t_ds_isolation_task`
     `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
     `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
     `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
-    `status`                 tinyint(4) DEFAULT '0',
     `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
     `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
     PRIMARY KEY (`id`),
-    KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE
+    UNIQUE KEY `workflow_instance_task__unique` (`workflow_instance_id`,`task_code`)
+) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_coronation_task`;
+CREATE TABLE `t_ds_coronation_task`
+(
+    `id`                       bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`     bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name`   varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`                varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`                bigint       NOT NULL COMMENT 'isolation task code',
+    `forbidden_upstream_tasks` text COMMENT 'The task in forbidden_upstream_tasks will be in forbidden execute list',
+    `create_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `workflow_instance_task__unique` (`workflow_instance_id`,`task_code`)
 ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
similarity index 52%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
index 826a456fd8..1a88860490 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
@@ -1,26 +1,26 @@
-package org.apache.dolphinscheduler.server.master.processor.isolation;
+package org.apache.dolphinscheduler.server.master.processor.coronation;
 
 import io.netty.channel.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+public class RefreshCoronationMetadataProcessor implements NettyRequestProcessor {
 
     @Autowired
-    private IsolationTaskManager isolationTaskManager;
+    private CoronationMetadataManager coronationMetadataManager;
 
     @Override
     public void process(Channel channel, Command command) {
-        if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
-            throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
+        if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
+            throw new IllegalArgumentException(String.format("The current rpc command : %s is invalidated", command));
         }
-        isolationTaskManager.refreshIsolationTaskMapFromDB();
+        coronationMetadataManager.refreshCoronationTaskMetadata();
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
similarity index 71%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
index 826a456fd8..19dd54211e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
@@ -5,22 +5,22 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+public class RefreshIsolationMetadataProcessor implements NettyRequestProcessor {
 
     @Autowired
-    private IsolationTaskManager isolationTaskManager;
+    private IsolationMetadataManager isolationTaskManager;
 
     @Override
     public void process(Channel channel, Command command) {
-        if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
+        if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
             throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
         }
-        isolationTaskManager.refreshIsolationTaskMapFromDB();
+        isolationTaskManager.refreshIsolationTaskMetadata();
     }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
similarity index 91%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
index ef3634cd52..3522ac4fec 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.api.remote;
+package org.apache.dolphinscheduler.server.master.rpc;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -11,13 +11,13 @@ import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class ApiServerRPCClient {
+public class MasterRPCClient {
 
     private final NettyRemotingClient client;
 
     private static final long DEFAULT_TIME_OUT_MILLS = 1_000L;
 
-    public ApiServerRPCClient() {
+    public MasterRPCClient() {
         client = new NettyRemotingClient(new NettyClientConfig());
         log.info("Success initialized ApiServerRPCClient...");
     }
@@ -32,4 +32,5 @@ public class ApiServerRPCClient {
                                 long timeoutMills) throws RemotingException, InterruptedException {
         client.sendSync(host, rpcCommand, timeoutMills);
     }
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index eb4d82cd3d..4644611a7f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.rpc;
 
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -31,9 +30,8 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningPro
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
 
-import javax.annotation.PostConstruct;
-
-import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor;
+import org.apache.dolphinscheduler.server.master.processor.coronation.RefreshCoronationMetadataProcessor;
+import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationMetadataProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -76,8 +74,10 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
 
+    private RefreshCoronationMetadataProcessor refreshCoronationMetadataProcessor;
+
     @Autowired
-    private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor;
+    private RefreshIsolationMetadataProcessor refreshIsolationTaskProcessor;
 
     public void start() {
         logger.info("Starting Master RPC Server...");
@@ -101,8 +101,10 @@ public class MasterRPCServer implements AutoCloseable {
         this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
-        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST,
+        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_METADATA_REQUEST,
                 refreshIsolationTaskProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_CORONATION_METADATA_REQUEST,
+                refreshCoronationMetadataProcessor);
 
         this.nettyRemotingServer.start();
         logger.info("Started Master RPC Server...");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 6446248c06..a72b39612d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -22,7 +22,8 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,10 @@ public class FailoverExecuteThread extends BaseDaemonThread {
     private MasterFailoverService masterFailoverService;
 
     @Autowired
-    private IsolationTaskManager isolationTaskManager;
+    private IsolationMetadataManager isolationMetadataManager;
+
+    @Autowired
+    protected CoronationMetadataManager coronationMetadataManager;
 
     protected FailoverExecuteThread() {
         super("FailoverExecuteThread");
@@ -70,7 +74,8 @@ public class FailoverExecuteThread extends BaseDaemonThread {
                 // todo: DO we need to schedule a task to do this kind of check
                 // This kind of check may only need to be executed when a master server start
                 masterFailoverService.checkMasterFailover();
-                isolationTaskManager.refreshIsolationTaskMapFromDB();
+                isolationMetadataManager.refreshIsolationTaskMetadata();
+                coronationMetadataManager.refreshCoronationTaskMetadata();
             } catch (Exception e) {
                 logger.error("Master failover thread execute error", e);
             } finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 1d823a71a7..45427fb402 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -46,6 +45,8 @@ import org.apache.dolphinscheduler.server.master.exception.MasterException;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -110,7 +111,10 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
     private StateEventCallbackService stateEventCallbackService;
 
     @Autowired
-    private IsolationTaskDao isolationTaskDao;
+    private CoronationMetadataManager coronationMetadataManager;
+
+    @Autowired
+    private IsolationMetadataManager isolationMetadataManager;
 
     private String masterAddress;
 
@@ -189,7 +193,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                                 masterConfig,
                                 stateWheelExecuteThread,
                                 curingGlobalParamsService,
-                                isolationTaskDao);
+                                isolationMetadataManager,
+                                coronationMetadataManager);
                         processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
                         workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
                                 processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 24d5d31451..b284c80ef2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -37,10 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.dto.TaskRuntimeContext;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.Environment;
-import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -49,7 +49,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -68,6 +67,8 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.server.master.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -191,8 +192,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final Set<String> stateCleanTaskCodes = new HashSet<>();
 
-    // isolationTaskCode -> isolatedTimes
-    private final Map<Long, Integer> isolationTaskCodesToTimesMap = new HashMap<>();
+    // TaskCode -> isolatedTimes
+    // this map store the task which will be isolated, and the times it has been isolated.
+    private final Map<Long, Integer> isolatedTaskCodesToTimesMap = new HashMap<>();
+    // TaskCode -> coronattedTimes
+    private final Map<Long, Integer> coronattedTaskCodeToTimesMap = new HashMap<>();
 
     /**
      * state event queue
@@ -214,7 +218,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final CuringParamsService curingParamsService;
 
-    private final IsolationTaskDao isolationTaskDao;
+    private final IsolationMetadataManager isolationMetadataManager;
+    private final CoronationMetadataManager coronationMetadataManager;
 
     private final String masterAddress;
 
@@ -236,7 +241,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                                    @NonNull MasterConfig masterConfig,
                                    @NonNull StateWheelExecuteThread stateWheelExecuteThread,
                                    @NonNull CuringParamsService curingParamsService,
-                                   @NonNull IsolationTaskDao isolationTaskDao) {
+                                   @NonNull IsolationMetadataManager isolationMetadataManager,
+                                   @NonNull CoronationMetadataManager coronationMetadataManager) {
         this.processService = processService;
         this.processInstanceDao = processInstanceDao;
         this.processInstance = processInstance;
@@ -244,8 +250,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
         this.curingParamsService = curingParamsService;
-        this.isolationTaskDao = isolationTaskDao;
+        this.isolationMetadataManager = isolationMetadataManager;
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        this.coronationMetadataManager = coronationMetadataManager;
         TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
     }
 
@@ -393,6 +400,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 if (!processInstance.isBlocked()) {
                     submitPostNode(Long.toString(taskInstance.getTaskCode()));
                 }
+                if (coronationMetadataManager.isInCoronationMode()
+                        && coronattedTaskCodeToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                    // The current task is coronation task and is success
+                    // we need to delete the coronation metadata in db, and send request to all master
+                    coronationMetadataManager.deleteCoronationMetadataInDB(taskInstance.getTaskCode());
+                    coronattedTaskCodeToTimesMap.remove(taskInstance.getTaskCode());
+                }
             } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
                 // retry task
                 logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
@@ -453,31 +467,35 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
-    public void onlineTaskIsolation(long taskCode) {
+    public void addTaskIsolation(long taskCode) {
         try {
             LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
             // find the post running task
-            Set<String> needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
-            // if the current task is finished, kill the post task
-            // we need to submit an online isolation task event, otherwise there may exist concurrent problem
-            for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) {
-                Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr);
-                isolationTaskCodesToTimesMap.put(isolationTaskCode,
-                        isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1);
-                ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
-                if (iTaskProcessor == null) {
-                    logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
-                            isolationTaskCode);
-                    continue;
-                }
-                iTaskProcessor.action(TaskAction.ISOLATE);
-                StateEvent stateEvent = new StateEvent();
-                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-                stateEvent.setProcessInstanceId(this.processInstance.getId());
-                stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
-                stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
-                this.addStateEvent(stateEvent);
-            }
+            DagHelper.getAllPostNodes(Long.toString(taskCode), dag)
+                    .stream()
+                    .map(Long::parseLong)
+                    .forEach(isolatedTaskCode -> {
+                        // if the current task is finished, kill the post task
+                        // we need to submit an online isolation task event, otherwise there may exist concurrent
+                        // problem
+                        isolatedTaskCodesToTimesMap.put(isolatedTaskCode,
+                                isolatedTaskCodesToTimesMap.getOrDefault(isolatedTaskCode, 0) + 1);
+                        ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolatedTaskCode);
+                        if (iTaskProcessor == null) {
+                            logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
+                                    isolatedTaskCode);
+                            return;
+                        }
+                        iTaskProcessor.action(TaskAction.ISOLATE);
+                        StateEvent stateEvent = new StateEvent();
+                        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+                        stateEvent.setProcessInstanceId(this.processInstance.getId());
+                        stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
+                        stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
+                        this.addStateEvent(stateEvent);
+
+                    });
+
         } finally {
             LoggerUtils.removeWorkflowInstanceIdMDC();
         }
@@ -486,12 +504,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     public void cancelTaskIsolation(long taskCode) {
         try {
             LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-            // todo:
             // restart the killed/ paused task
-            Set<String> needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
-            for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) {
-                Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr);
-                Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode);
+            Set<String> needToCancelIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+            for (String needToCancelIsolationTaskCodeStr : needToCancelIsolationTaskCodes) {
+                Long isolationTaskCode = Long.valueOf(needToCancelIsolationTaskCodeStr);
+                Integer isolateTimes = isolatedTaskCodesToTimesMap.get(isolationTaskCode);
                 if (isolateTimes == null) {
                     logger.warn(
                             "The current task has not been isolated, so it don't need to offline isolation, taskCode: {}",
@@ -499,7 +516,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                     continue;
                 }
                 if (isolateTimes == 1) {
-                    isolationTaskCodesToTimesMap.remove(isolationTaskCode);
+                    isolatedTaskCodesToTimesMap.remove(isolationTaskCode);
                     ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
                     if (iTaskProcessor == null) {
                         // the current task has not been submitted
@@ -521,7 +538,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                         continue;
                     }
                 } else {
-                    isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
+                    isolatedTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
                 }
 
             }
@@ -899,15 +916,28 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
         // generate process dag
         dag = DagHelper.buildDagGraph(processDag);
-        List<IsolationTask> isolationTasks =
-                isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE);
-        for (IsolationTask isolationTask : isolationTasks) {
-            Set<String> allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag);
-            allPostNodes.forEach(postNode -> {
-                isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(),
-                        isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1);
-            });
-        }
+        isolationMetadataManager.queryIsolationTasksByWorkflowInstanceId(processInstance.getId())
+                .forEach(isolationTask -> {
+                    DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag)
+                            .stream()
+                            .map(Long::parseLong)
+                            .forEach(taskCode -> {
+                                isolatedTaskCodesToTimesMap.put(
+                                        taskCode,
+                                        isolatedTaskCodesToTimesMap.getOrDefault(taskCode, 0) + 1);
+                            });
+                });
+        coronationMetadataManager.getCoronationTasksByWorkflowInstanceId(processInstance.getId())
+                .forEach(coronationTask -> {
+                    DagHelper.getAllPreNodes(Long.toString(coronationTask.getTaskCode()), dag)
+                            .stream()
+                            .map(Long::parseLong)
+                            .forEach(taskCode -> {
+                                coronattedTaskCodeToTimesMap.put(
+                                        taskCode,
+                                        coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
+                            });
+                });
         logger.info("Build dag success, dag: {}", dag);
     }
 
@@ -1043,11 +1073,28 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
         try {
-            if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+            // todo: use TaskInstanceDTO
+            TaskRuntimeContext taskRuntimeContext = new TaskRuntimeContext();
+            if (isolatedTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                taskRuntimeContext.setIsolationTask(
+                        isolationMetadataManager.isIsolatedTask(processInstance.getId(), taskInstance.getTaskCode()));
+                taskRuntimeContext.setHasBeenIsolated(true);
                 taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION);
                 logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}",
                         taskInstance.getTaskCode());
             }
+            if (coronationMetadataManager.isInCoronationMode()) {
+                if (!coronattedTaskCodeToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                    taskInstance.setState(ExecutionStatus.PAUSE_BY_CORONATION);
+                    logger.info(
+                            "The current server mode is in coronation, and the task: {} is not in coronation list, will pause it",
+                            taskInstance.getTaskCode());
+                }
+                taskRuntimeContext.setCoronationTask(coronationMetadataManager.isCoronationTask(processInstance.getId(),
+                        taskInstance.getTaskCode()));
+                taskRuntimeContext.setHasBeenCoronatted(true);
+            }
+            taskInstance.setRuntimeContext(JSONUtils.toJsonString(taskRuntimeContext));
 
             // package task instance before submit
             processService.packageTaskInstance(taskInstance, processInstance);
@@ -1784,6 +1831,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             if (task == null) {
                 continue;
             }
+            if (coronationMetadataManager.isInCoronationMode()
+                    && skipTaskNodeMap.containsKey(Long.toString(task.getTaskCode()))) {
+                logger.info(
+                        "The current server mode is in coronation, and the task: {} is in skip list, will ignore this task",
+                        task.getTaskCode());
+                continue;
+            }
+
             // stop tasks which is retrying if forced success happens
             if (task.taskCanRetry()) {
                 TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
@@ -1921,6 +1976,65 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
+    /**
+     * Add the task in coronation list, if the task is pause by coronation, will recovery it.
+     * if the task is in skip list, will ignore the coronation.
+     */
+    public void onlineTaskCoronation(CoronationTaskDTO coronationTask) {
+        // set the forbidden task nodes
+        long taskCode = coronationTask.getTaskCode();
+        coronationTask.getForbiddenUpstreamTasks().forEach(forbiddenTaskNode -> {
+            String forbiddenTaskNodeStr = Long.toString(forbiddenTaskNode.getTaskCode());
+            skipTaskNodeMap.put(forbiddenTaskNodeStr, dag.getNode(forbiddenTaskNodeStr));
+        });
+        // find the upstream task
+        Set<String> parentNodes = DagHelper.getAllPreNodes(Long.toString(taskCode), dag);
+        for (String parentNodeCodeStr : parentNodes) {
+            long parentNodeCode = Long.parseLong(parentNodeCodeStr);
+            if (skipTaskNodeMap.containsKey(parentNodeCodeStr)) {
+                logger.info("The parent node: {} is in skip list, no need to coronation", parentNodeCodeStr);
+                continue;
+            }
+            coronattedTaskCodeToTimesMap.put(parentNodeCode,
+                    coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
+            Integer parentNodeInstanceId = validTaskMap.get(parentNodeCode);
+            if (parentNodeInstanceId != null) {
+                TaskInstance taskInstance = activeTaskProcessorMaps.get(parentNodeInstanceId).taskInstance();
+                if (taskInstance.getState().typeIsPauseByCoronation()) {
+                    // resubmit the task to standbylist, this task will be resubmit again.
+                    taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                    logger.info(
+                            "The current task: {} has been added to coronation, will recovery from pause_by_coronation, now set the status to submitted_success, it will submit again",
+                            parentNodeCode);
+                    addTaskToStandByList(taskInstance);
+                    continue;
+                }
+            } else {
+                logger.info("The current task: {} has been added to coronation", parentNodeCode);
+            }
+        }
+    }
+
+    /**
+     * Cancel the coronation task, will only remove the task in coronation task.
+     */
+    public void cancelTaskCoronation(CoronationTaskDTO coronationTask) {
+        Set<String> parentNodes = DagHelper.getAllPreNodes(Long.toString(coronationTask.getTaskCode()), dag);
+        for (String parentNodeStr : parentNodes) {
+            long parentNode = Long.parseLong(parentNodeStr);
+            Integer coronationTimes = coronattedTaskCodeToTimesMap.get(parentNode);
+            if (coronationTimes == null) {
+                continue;
+            }
+            if (coronationTimes == 1) {
+                logger.info("The current task: {} has been removed from coronation list", parentNode);
+                coronattedTaskCodeToTimesMap.remove(parentNode);
+                continue;
+            }
+            coronattedTaskCodeToTimesMap.put(parentNode, coronationTimes - 1);
+        }
+    }
+
     private enum WorkflowRunnableStatus {
         CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
         ;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
new file mode 100644
index 0000000000..68aba94bda
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
@@ -0,0 +1,273 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRPCClient;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.transformer.MasterCommandTransformer;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class CoronationMetadataManager {
+
+    @Autowired
+    private CoronationTaskDao coronationTaskDao;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Autowired
+    private MasterCommandTransformer commandTransformer;
+
+    @Autowired
+    private CommandDao commandDao;
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private MasterRPCClient masterRPCClient;
+
+    private volatile CoronationMode coronationMode = CoronationMode.NOT_IN_CORONATION;
+
+    // coronationId -> coronationTask
+    private volatile Map<Long, CoronationTaskDTO> coronationTaskInMemory = new HashMap<>();
+    private final Map<Integer, Set<CoronationTaskDTO>> workflowCoronationTaskMap = new HashMap<>();
+    // workflowInstanceId -> coronationTaskCodes
+    private final Map<Integer, Set<Long>> workflowCoronationTaskCodeMap = new HashMap<>();
+
+    @PostConstruct
+    public void init() {
+        refreshCoronationTaskMetadata();
+    }
+
+    public synchronized void refreshCoronationTaskMetadata() {
+        StopWatch stopWatch = StopWatch.createStarted();
+        Map<Long, CoronationTaskDTO> coronationTaskInDB = coronationTaskDao.queryAllCoronationTasks()
+                .stream()
+                .collect(Collectors.toMap(CoronationTaskDTO::getId, Function.identity()));
+
+        List<CoronationTaskDTO> addCoronationTasks = coronationTaskInDB.values()
+                .stream()
+                .filter(coronationTask -> !coronationTaskInMemory.containsKey(coronationTask.getId()))
+                .collect(Collectors.toList());
+        List<CoronationTaskDTO> deleteCoronationTasks = coronationTaskInMemory.values()
+                .stream()
+                .filter(coronationTask -> !coronationTaskInDB.containsKey(coronationTask.getId()))
+                .collect(Collectors.toList());
+        coronationTaskInMemory = coronationTaskInDB;
+        if (coronationTaskInMemory.isEmpty()) {
+            if (coronationMode == CoronationMode.IN_CORONATION) {
+                log.info("There is not coronation tasks, will begin to close coronation mode...");
+                closeCoronation();
+                coronationMode = CoronationMode.NOT_IN_CORONATION;
+                log.info("Close coronation mode success...");
+            }
+        } else {
+            addCoronationTasks(addCoronationTasks);
+            cancelCoronationTasks(deleteCoronationTasks);
+            if (coronationMode == CoronationMode.NOT_IN_CORONATION) {
+                coronationMode = CoronationMode.IN_CORONATION;
+                log.info("Open the coronation mode...");
+            }
+        }
+        stopWatch.stop();
+        log.info("Refresh coronation task from DB finished, cost: {}", stopWatch.getTime());
+    }
+
+    public boolean isCoronationTask(int workflowInstanceId, long taskCode) {
+        Set<Long> taskCodes = workflowCoronationTaskCodeMap.get(workflowInstanceId);
+        if (taskCodes == null) {
+            return false;
+        }
+        return taskCodes.contains(taskCode);
+    }
+
+    public void deleteCoronationMetadataInDB(long taskCode) {
+        Optional<CoronationTaskDTO> successCoronationTask = coronationTaskInMemory.values()
+                .stream()
+                .filter(coronationTask -> coronationTask.getTaskCode() == taskCode)
+                .findAny();
+        if (!successCoronationTask.isPresent()) {
+            return;
+        }
+        coronationTaskDao.deleteById(successCoronationTask.get().getId());
+        if (coronationTaskDao.queryAllCoronationTaskNumber() == 0) {
+            // sendCoronationRefreshRequestTo All master
+            try {
+                log.info("There is no coronation tasks, will send refresh coronation meta data to all master");
+                List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+                RefreshCoronationMetadataRequest request = new RefreshCoronationMetadataRequest();
+                for (Server master : masters) {
+                    try {
+                        masterRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+                                request.convert2Command());
+                    } catch (Exception e) {
+                        log.error(
+                                "Close coronation failed, send refresh coronation metadata request to master: {} error",
+                                master, e);
+                    }
+                }
+            } catch (Exception ex) {
+                log.error(
+                        "Close coronation failed, meet an unknown exception, will wait the master to auto refresh the coronation metadata",
+                        ex);
+            }
+        }
+    }
+
+    private void closeCoronation() {
+        // The current server is in coronation mode, need to close coronation.
+        // Need to acquire a lock to guarantee there is only one master recovery the pause_by_coronation workflow
+        try {
+            // block to acquire the master lock
+            if (!registryClient.getLock(NodeType.MASTER.getRegistryPath())) {
+                log.error("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
+                return;
+            }
+            // find the all instance that need to be recovery
+            // create recovery command
+            List<Command> needToInsertCommand =
+                    processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_ISOLATION)
+                            .stream()
+                            .filter(processInstance -> {
+                                List<Command> commands = commandDao
+                                        .queryRecoveryCoronationCommandByWorkflowInstanceId(processInstance.getId());
+                                // this workflow instance has not been recovery.
+                                return CollectionUtils.isEmpty(commands);
+                            })
+                            .map(processInstance -> {
+                                List<Integer> needToRecoveryTaskInstanceIds = taskInstanceDao
+                                        .queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(
+                                                processInstance.getId(), ExecutionStatus.PAUSE_BY_CORONATION)
+                                        .stream()
+                                        .map(TaskInstance::getId)
+                                        .collect(Collectors.toList());
+                                return commandTransformer.transformToRecoveryFromCoronationPauseCommand(processInstance,
+                                        needToRecoveryTaskInstanceIds);
+                            })
+                            .collect(Collectors.toList());
+            commandDao.batchInsertCommand(needToInsertCommand);
+        } finally {
+            registryClient.releaseLock(NodeType.MASTER.getRegistryPath());
+        }
+    }
+
+    private void addCoronationTasks(List<CoronationTaskDTO> coronationTasksNeedToOnline) {
+        if (CollectionUtils.isEmpty(coronationTasksNeedToOnline)) {
+            return;
+        }
+        for (CoronationTaskDTO coronationTask : coronationTasksNeedToOnline) {
+            workflowCoronationTaskCodeMap.computeIfAbsent(coronationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(coronationTask.getTaskCode());
+            workflowCoronationTaskMap.computeIfAbsent(coronationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(coronationTask);
+            WorkflowExecuteRunnable workflowExecuteRunnable =
+                    processInstanceExecCacheManager.getByProcessInstanceId(coronationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(coronationTask.getWorkflowInstanceId());
+                log.info("Begin to add new coronation task: {}", coronationTask);
+                workflowExecuteRunnable.onlineTaskCoronation(coronationTask);
+            } catch (Exception ex) {
+                log.error("Add new coronation task: {} failed, meet an unknown exception", coronationTask, ex);
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+    private void cancelCoronationTasks(List<CoronationTaskDTO> coronationTasksNeedToOffline) {
+        if (CollectionUtils.isEmpty(coronationTasksNeedToOffline)) {
+            return;
+        }
+        for (CoronationTaskDTO coronationTask : coronationTasksNeedToOffline) {
+            Set<Long> taskCodes = workflowCoronationTaskCodeMap.get(coronationTask.getWorkflowInstanceId());
+            Set<CoronationTaskDTO> coronationTasks =
+                    workflowCoronationTaskMap.get(coronationTask.getWorkflowInstanceId());
+            if (taskCodes != null) {
+                taskCodes.remove(coronationTask.getTaskCode());
+                coronationTasks.remove(coronationTask);
+                if (taskCodes.isEmpty()) {
+                    workflowCoronationTaskCodeMap.remove(coronationTask.getWorkflowInstanceId());
+                    workflowCoronationTaskMap.remove(coronationTask.getWorkflowInstanceId());
+                }
+            }
+            WorkflowExecuteRunnable workflowExecuteRunnable =
+                    processInstanceExecCacheManager.getByProcessInstanceId(coronationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(coronationTask.getWorkflowInstanceId());
+                log.info("Begin to cancel coronation task: {}", coronationTask);
+                workflowExecuteRunnable.cancelTaskCoronation(coronationTask);
+            } catch (Exception ex) {
+                log.error("Cancel coronation task: {} failed, meed an unknown exception", coronationTask, ex);
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+    public boolean isInCoronationMode() {
+        return coronationMode == CoronationMode.IN_CORONATION;
+    }
+
+    public Set<CoronationTaskDTO> getCoronationTasksByWorkflowInstanceId(int workflowInstanceId) {
+        return workflowCoronationTaskMap.getOrDefault(workflowInstanceId, Collections.emptySet());
+    }
+
+    public enum CoronationMode {
+
+        IN_CORONATION(0, "The current server is already in coronation mode"),
+        NOT_IN_CORONATION(1, "The current server is not in coronation mode"),
+        ;
+
+        private final int code;
+        private final String desc;
+
+        CoronationMode(int code, String desc) {
+            this.code = code;
+            this.desc = desc;
+        }
+
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
new file mode 100644
index 0000000000..d90a95a42d
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
@@ -0,0 +1,136 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class IsolationMetadataManager {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    private volatile Map<Long, IsolationTask> isolationTasksInMemory = new HashMap<>();
+    private final Map<Integer, Set<Long>> workflowIsolatedTaskCodeMap = new HashMap<>();
+    private final Map<Integer, Set<IsolationTask>> workflowIsolatedTaskMap = new HashMap<>();
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @PostConstruct
+    public void init() {
+        refreshIsolationTaskMetadata();
+    }
+
+    public synchronized void refreshIsolationTaskMetadata() {
+        StopWatch stopWatch = StopWatch.createStarted();
+
+        Map<Long, IsolationTask> isolationTasksInDB =
+                isolationTaskDao.queryAllIsolationTask()
+                        .stream()
+                        .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
+
+        List<IsolationTask> needToAddIsolationTasks = isolationTasksInMemory.values()
+                .stream()
+                .filter(isolationTask -> !isolationTasksInDB.containsKey(isolationTask.getId()))
+                .collect(Collectors.toList());
+
+        List<IsolationTask> needToCancelIsolationTasks = isolationTasksInDB.values()
+                .stream()
+                .filter(isolationTask -> !isolationTasksInMemory.containsKey(isolationTask.getId()))
+                .collect(Collectors.toList());
+
+        isolationTasksInMemory = isolationTasksInDB;
+
+        cancelIsolationTask(needToAddIsolationTasks);
+        addIsolationTask(needToCancelIsolationTasks);
+
+        stopWatch.stop();
+        log.info("Refresh isolation task from db finished, cost: {} ms", stopWatch.getTime());
+    }
+
+    public boolean isIsolatedTask(int workflowInstanceId, long taskCode) {
+        Set<Long> taskCodes = workflowIsolatedTaskCodeMap.get(workflowInstanceId);
+        if (taskCodes == null) {
+            return false;
+        }
+        return taskCodes.contains(taskCode);
+    }
+
+    private void cancelIsolationTask(List<IsolationTask> needCancelIsolationTasks) {
+        if (CollectionUtils.isEmpty(needCancelIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needCancelIsolation : needCancelIsolationTasks) {
+            Set<Long> taskCodes = workflowIsolatedTaskCodeMap.get(needCancelIsolation.getWorkflowInstanceId());
+            Set<IsolationTask> isolationTasks =
+                    workflowIsolatedTaskMap.get(needCancelIsolation.getWorkflowInstanceId());
+            if (taskCodes != null) {
+                taskCodes.remove(needCancelIsolation.getTaskCode());
+                isolationTasks.remove(needCancelIsolation);
+                if (taskCodes.isEmpty()) {
+                    workflowIsolatedTaskCodeMap.remove(needCancelIsolation.getWorkflowInstanceId());
+                    workflowIsolatedTaskMap.remove(needCancelIsolation.getWorkflowInstanceId());
+                }
+            }
+
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needCancelIsolation.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            workflowExecuteRunnable.cancelTaskIsolation(needCancelIsolation.getTaskCode());
+            log.info("Backend offline isolation task, isolationTaskId: {}", needCancelIsolation.getId());
+        }
+    }
+
+    private void addIsolationTask(List<IsolationTask> needAddIsolationTasks) {
+        if (CollectionUtils.isEmpty(needAddIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needAddIsolationTask : needAddIsolationTasks) {
+            workflowIsolatedTaskCodeMap
+                    .computeIfAbsent(needAddIsolationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(needAddIsolationTask.getTaskCode());
+            workflowIsolatedTaskMap
+                    .computeIfAbsent(needAddIsolationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(needAddIsolationTask);
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needAddIsolationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(needAddIsolationTask.getWorkflowInstanceId());
+                log.info("Begin to add new isolation task, taskCode: {}", needAddIsolationTask.getTaskCode());
+                workflowExecuteRunnable.addTaskIsolation(needAddIsolationTask.getTaskCode());
+            } catch (Exception ex) {
+                log.error("Add new isolation task: {} failed, meet an known exception",
+                        needAddIsolationTask.getTaskCode());
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+    public Set<IsolationTask> queryIsolationTasksByWorkflowInstanceId(int workflowInstanceId) {
+        return workflowIsolatedTaskMap.getOrDefault(workflowInstanceId, Collections.emptySet());
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
deleted file mode 100644
index 2621766d84..0000000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.dolphinscheduler.server.master.service;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
-import org.apache.dolphinscheduler.dao.entity.IsolationTask;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Service
-public class IsolationTaskManager {
-
-    @Autowired
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
-    private volatile Map<Long, IsolationTask> onlineIsolationTasksInMemory = new HashMap<>();
-
-    private volatile Set<Long> currentIsolationTaskIdsInMemory = new HashSet<>();
-
-    @Autowired
-    private IsolationTaskDao isolationTaskDao;
-
-    @PostConstruct
-    public void init() {
-        refreshIsolationTaskMapFromDB();
-    }
-
-    public void refreshIsolationTaskMapFromDB() {
-        Map<Long, IsolationTask> totalOnlineIsolationTasksInDB =
-                isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE)
-                        .stream()
-                        .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
-        Set<Long> totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet();
-
-        Collection<IsolationTask> needToOfflineIsolationTasks =
-                CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB)
-                        .stream()
-                        .map(onlineIsolationTasksInMemory::get)
-                        .collect(Collectors.toList());
-
-        Collection<IsolationTask> needToOnlineIsolationTasks =
-                CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory)
-                        .stream()
-                        .map(totalOnlineIsolationTasksInDB::get)
-                        .collect(Collectors.toList());
-
-        currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB;
-        onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB;
-
-        offlineIsolationTask(needToOfflineIsolationTasks);
-        onlineIsolationTask(needToOnlineIsolationTasks);
-    }
-
-    private void offlineIsolationTask(Collection<IsolationTask> needOfflineIsolationTasks) {
-        if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) {
-            return;
-        }
-        for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) {
-            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
-                    .getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId());
-            if (workflowExecuteRunnable == null) {
-                continue;
-            }
-            workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode());
-            log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId());
-        }
-    }
-
-    private void onlineIsolationTask(Collection<IsolationTask> needOnlineIsolationTasks) {
-        if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) {
-            return;
-        }
-        for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) {
-            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
-                    .getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId());
-            if (workflowExecuteRunnable == null) {
-                continue;
-            }
-            workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode());
-            log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId());
-        }
-    }
-
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java
new file mode 100644
index 0000000000..86f19925c8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java
@@ -0,0 +1,14 @@
+package org.apache.dolphinscheduler.server.master.transformer;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+
+import java.util.List;
+
+public interface MasterCommandTransformer {
+
+    Command transformToRecoveryFromCoronationPauseCommand(@NonNull ProcessInstance processInstance,
+                                                          @NonNull List<Integer> needToRecoveryTaskInstanceIds);
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java
new file mode 100644
index 0000000000..d8b900093b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java
@@ -0,0 +1,45 @@
+package org.apache.dolphinscheduler.server.master.transformer;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class MasterCommandTransformerImpl implements MasterCommandTransformer {
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Override
+    public Command transformToRecoveryFromCoronationPauseCommand(@NonNull ProcessInstance processInstance,
+                                                                 @NonNull List<Integer> needToRecoveryTaskInstanceIds) {
+        if (CollectionUtils.isEmpty(needToRecoveryTaskInstanceIds)) {
+            throw new IllegalArgumentException("The recovery task instance cannot be empty");
+        }
+
+        Command command = new Command();
+        command.setProcessInstanceId(processInstance.getId());
+        command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+        command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+        command.setTaskDependType(processInstance.getTaskDependType());
+        command.setFailureStrategy(processInstance.getFailureStrategy());
+        command.setCommandType(CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS);
+
+        Map<String, String> map = new HashMap<>();
+        map.put(Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS,
+                JSONUtils.toJsonString(needToRecoveryTaskInstanceIds));
+        command.setCommandParam(JSONUtils.toJsonString(map));
+        return command;
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
index 00ae91019d..4bdacfc3a4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
 import static org.apache.dolphinscheduler.common.Constants.COMMA;
@@ -38,6 +39,8 @@ public class WorkflowInstanceUtils {
                 return getStartTaskInstanceIdsFromStateCleanParam(processInstance);
             case RECOVERY_FROM_ISOLATION_TASKS:
                 return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance);
+            case RECOVERY_FROM_CORONATION_PAUSE_TASKS:
+                return getStartTaskInstanceIdsFromRecoverCoronationParam(processInstance);
             default:
                 return Collections.emptyList();
         }
@@ -87,6 +90,21 @@ public class WorkflowInstanceUtils {
         return result;
     }
 
+    public static List<Integer> getStartTaskInstanceIdsFromRecoverCoronationParam(@NonNull ProcessInstance processInstance) {
+        Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+        if (MapUtils.isEmpty(commandParamMap)) {
+            return Collections.emptyList();
+        }
+        List<Integer> stateCleanTaskInstanceIds =
+                JSONUtils.parseObject(commandParamMap.get(CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS),
+                        new TypeReference<ArrayList<Integer>>() {
+                        });
+        if (stateCleanTaskInstanceIds == null) {
+            return Collections.emptyList();
+        }
+        return stateCleanTaskInstanceIds;
+    }
+
     public static List<String> getStartNodeName(@NonNull ProcessInstance processInstance) {
         List<String> startNodeNameList = new ArrayList<>();
         Map<String, String> paramMap = JSONUtils.toMap(processInstance.getCommandParam());
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 856d38cb2f..4e9c9ce9e0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -17,19 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-
-import static org.powermock.api.mockito.PowerMockito.mock;
-
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
-import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -37,12 +27,25 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.context.ApplicationContext;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -58,15 +61,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.springframework.context.ApplicationContext;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.powermock.api.mockito.PowerMockito.mock;
 
 /**
  * test for WorkflowExecuteThread
@@ -93,7 +92,9 @@ public class WorkflowExecuteRunnableTest {
 
     private CuringParamsService curingGlobalParamsService;
 
-    private IsolationTaskDao isolationTaskDao;
+    private IsolationMetadataManager isolationMetadataManager;
+
+    private CoronationMetadataManager coronationMetadataManager;
 
     @Before
     public void init() throws Exception {
@@ -127,11 +128,12 @@ public class WorkflowExecuteRunnableTest {
         curingGlobalParamsService = mock(CuringParamsService.class);
         NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
         ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
-        isolationTaskDao = mock(IsolationTaskDao.class);
+        isolationMetadataManager = mock(IsolationMetadataManager.class);
+        coronationMetadataManager = mock(CoronationMetadataManager.class);
         workflowExecuteThread =
                 PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao,
                         nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread,
-                        curingGlobalParamsService, isolationTaskDao));
+                        curingGlobalParamsService, isolationMetadataManager, coronationMetadataManager));
         // prepareProcess init dag
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index daf92071b5..a69da08951 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -121,7 +121,9 @@ public enum CommandType {
      */
     WORKFLOW_EXECUTING_DATA_RESPONSE,
 
-    REFRESH_ISOLATION_REQUEST,
+    REFRESH_ISOLATION_METADATA_REQUEST,
+
+    REFRESH_CORONATION_METADATA_REQUEST,
     ;
 
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
similarity index 66%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
index 9b097843c4..4f6d94f2a9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.remote.command.isolation;
+package org.apache.dolphinscheduler.remote.command.coronation;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -6,13 +6,14 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 
 import java.io.Serializable;
 
-public class RefreshIsolationTaskRequest implements Serializable {
+public class RefreshCoronationMetadataRequest implements Serializable {
 
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+        command.setType(CommandType.REFRESH_CORONATION_METADATA_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
+
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
similarity index 68%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
index 9b097843c4..0241823a08 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
@@ -1,16 +1,20 @@
 package org.apache.dolphinscheduler.remote.command.isolation;
 
+import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 
 import java.io.Serializable;
 
-public class RefreshIsolationTaskRequest implements Serializable {
+@Data
+@NoArgsConstructor
+public class RefreshIsolationMetadataRequest implements Serializable {
 
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+        command.setType(CommandType.REFRESH_ISOLATION_METADATA_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c1fe78e78a..43000df020 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1048,10 +1048,9 @@ public class ProcessServiceImpl implements ProcessService {
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 break;
             case RECOVERY_FROM_ISOLATION_TASKS:
-                Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
                 List<Integer> recoveryPausedIsolationIds =
                         JSONUtils.parseObject(
-                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+                                cmdParam.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
                                 new TypeReference<ArrayList<Integer>>() {
                                 });
                 if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) {
@@ -1059,7 +1058,7 @@ public class ProcessServiceImpl implements ProcessService {
                 }
                 List<Integer> recoveryKilledIsolationIds =
                         JSONUtils.parseObject(
-                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+                                cmdParam.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
                                 new TypeReference<ArrayList<Integer>>() {
                                 });
                 if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) {
@@ -1068,6 +1067,18 @@ public class ProcessServiceImpl implements ProcessService {
                 processInstance.setRunTimes(runTime + 1);
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 break;
+            case RECOVERY_FROM_CORONATION_PAUSE_TASKS:
+                List<Integer> pauseByCoronationIds =
+                        JSONUtils.parseObject(cmdParam.get(Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS),
+                                new TypeReference<ArrayList<Integer>>() {
+                                });
+                for (Integer taskId : pauseByCoronationIds) {
+                    // initialize the pause state
+                    initTaskInstance(this.findTaskInstanceById(taskId));
+                }
+                processInstance.setRunTimes(runTime + 1);
+                processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                break;
             default:
                 break;
         }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index a83184e16b..6f78e37629 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -67,6 +67,8 @@ public enum ExecutionStatus {
     DISPATCH(17, "dispatch"),
     PAUSE_BY_ISOLATION(18, "paused by isolation"),
     KILL_BY_ISOLATION(19, "killed by isolation"),
+
+    PAUSE_BY_CORONATION(20, "paused by coronation"),
     ;
 
     ExecutionStatus(int code, String descp) {
@@ -120,7 +122,7 @@ public enum ExecutionStatus {
      */
     public boolean typeIsFinished() {
         return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation()
-                || typeIsStop() || typeIsBlock();
+                || typeIsStop() || typeIsBlock() || typeIsPauseByCoronation();
     }
 
     public boolean typeIsReady() {
@@ -149,6 +151,10 @@ public enum ExecutionStatus {
         return this == PAUSE_BY_ISOLATION;
     }
 
+    public boolean typeIsPauseByCoronation() {
+        return this == PAUSE_BY_CORONATION;
+    }
+
     public boolean typeIsKilledByIsolation() {
         return this == KILL_BY_ISOLATION;
     }