You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/11/01 01:30:25 UTC

[dolphinscheduler] branch dev_wenjun_coronationTask updated (90d94ec1ec -> 040a9eb718)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a change to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


 discard 90d94ec1ec Add coronation task
 discard 809cee638e Implement isolate task api interface
     new 7e7b051852 Catch exception when insert alert failed (#179)
     new 350f1b3825 Support coronation task (#182)
     new 9ef8d4bb05 Fix workerGroup will be updated by WorkerDataListener (#183)
     new 05de0d280c Change requestBody
     new 040a9eb718 Refresh coronation when delete workflow instance

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (90d94ec1ec)
            \
             N -- N -- N   refs/heads/dev_wenjun_coronationTask (040a9eb718)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/controller/CoronationTaskController.java   | 13 ++++-
 .../api/controller/IsolationTaskController.java    | 16 +++---
 .../dto/request/CoronationTaskListingRequest.java  |  2 +
 .../dto/request/CoronationTaskSubmitRequest.java   |  4 +-
 .../api/remote/ApiServerRPCClient.java             |  5 ++
 .../api/service/IsolationTaskService.java          |  7 ++-
 .../service/impl/CoronationTaskServiceImpl.java    | 10 ++--
 .../api/service/impl/IsolationTaskServiceImpl.java | 33 ++++++++---
 .../service/impl/ProcessInstanceServiceImpl.java   | 12 ++++
 .../api/vo/IsolationTaskListingVO.java             | 12 ++--
 .../dolphinscheduler/common/utils/JSONUtils.java   |  1 +
 .../dao/dto/TaskSimpleInfoDTO.java                 |  4 +-
 .../dolphinscheduler/dao/mapper/CommandMapper.java |  7 ++-
 .../dao/mapper/CoronationTaskMapper.java           |  1 +
 .../dao/mapper/IsolationTaskMapper.java            |  1 +
 .../dao/repository/CoronationTaskDao.java          |  3 +-
 .../dao/repository/IsolationTaskDao.java           |  2 +
 .../dao/repository/impl/CommandDaoImpl.java        |  2 +-
 .../dao/repository/impl/CoronationTaskDaoImpl.java | 11 +++-
 .../dao/repository/impl/IsolationTaskDaoImpl.java  |  5 ++
 .../dolphinscheduler/dao/mapper/CommandMapper.xml  |  9 +++
 .../dao/mapper/CoronationTaskMapper.xml            | 10 +++-
 .../dao/mapper/IsolationTaskMapper.xml             | 12 +++-
 .../dao/mapper/ProcessInstanceMapper.xml           |  6 ++
 .../dao/mapper/TaskInstanceMapper.xml              | 15 ++++-
 .../src/main/resources/sql/dolphinscheduler_h2.sql |  2 +-
 .../RefreshCoronationMetadataProcessor.java        |  2 +-
 .../server/master/registry/ServerNodeManager.java  | 11 +---
 .../server/master/rpc/MasterRPCClient.java         |  4 ++
 .../server/master/rpc/MasterRPCServer.java         |  6 ++
 .../master/runner/WorkflowExecuteRunnable.java     | 47 ++++++++++++----
 .../master/service/CoronationMetadataManager.java  | 17 +++---
 .../master/service/IsolationMetadataManager.java   | 65 +++++++++++-----------
 .../isolation/RefreshIsolationMetadataRequest.java |  4 ++
 .../service/process/ProcessServiceImpl.java        |  6 +-
 35 files changed, 253 insertions(+), 114 deletions(-)
 copy dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java => dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java (56%)


[dolphinscheduler] 02/05: Support coronation task (#182)

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 350f1b3825c3ac3dda0a4b79d571ee6b42a606ba
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Oct 28 14:54:17 2022 +0800

    Support coronation task (#182)
    
    * Implement isolate task api interface
    
    * Add coronation task
---
 .../api/checker/CoronationTaskChecker.java         |  38 +++
 .../api/checker/CoronationTaskCheckerImpl.java     |  99 ++++++++
 .../api/checker/IsolationTaskChecker.java          |   9 -
 .../api/checker/IsolationTaskCheckerImpl.java      |  55 -----
 .../constants/ApiFuncIdentificationConstant.java   |   7 +-
 .../api/controller/CoronationTaskController.java   |  91 +++++++
 .../api/controller/IsolationTaskController.java    |  39 +--
 .../dto/request/CoronationTaskListingRequest.java  |  24 ++
 .../dto/request/CoronationTaskParseRequest.java    |  18 ++
 .../dto/request/CoronationTaskSubmitRequest.java   |  16 ++
 .../apache/dolphinscheduler/api/enums/Status.java  |  13 +
 .../api/remote/ApiServerRPCClient.java             |   5 +
 .../api/service/CoronationTaskService.java         |  30 +++
 .../api/service/IsolationTaskService.java          |  14 +-
 .../service/impl/CoronationTaskServiceImpl.java    | 205 ++++++++++++++++
 .../api/service/impl/IsolationTaskServiceImpl.java |  98 +++-----
 .../api/service/impl/TaskInstanceServiceImpl.java  |   3 +-
 .../api/vo/CoronationTaskExcelImportVO.java        |  27 ++
 .../api/vo/CoronationTaskParseVO.java              |  34 +++
 .../api/vo/IsolationTaskListingVO.java             |  12 +-
 .../apache/dolphinscheduler/common/Constants.java  |   3 +
 .../dolphinscheduler/common/enums/CommandType.java |   2 +
 .../dolphinscheduler/common/utils/JSONUtils.java   |   1 +
 .../dao/dto/CoronationTaskDTO.java                 |  50 ++++
 .../dao/dto/IsolationTaskStatus.java               |  27 --
 .../dao/dto/TaskRuntimeContext.java                |  20 ++
 .../dao/dto/TaskSimpleInfoDTO.java                 |  15 ++
 .../{IsolationTask.java => CoronationTask.java}    |   6 +-
 .../dolphinscheduler/dao/entity/IsolationTask.java |   2 -
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  10 +
 .../dolphinscheduler/dao/mapper/CommandMapper.java |   3 +
 .../dao/mapper/CoronationTaskMapper.java           |  24 ++
 .../dao/mapper/IsolationTaskMapper.java            |  18 +-
 .../dao/mapper/ProcessInstanceMapper.java          |   2 +
 .../dao/mapper/TaskInstanceMapper.java             |   4 +
 .../dao/repository/CommandDao.java                 |   2 +
 .../dao/repository/CoronationTaskDao.java          |  28 +++
 .../dao/repository/IsolationTaskDao.java           |  14 +-
 .../dao/repository/ProcessDefinitionDao.java       |   4 +-
 .../dao/repository/ProcessInstanceDao.java         |   5 +-
 .../dao/repository/TaskInstanceDao.java            |   4 +
 .../dao/repository/impl/CommandDaoImpl.java        |   7 +
 .../dao/repository/impl/CoronationTaskDaoImpl.java |  65 +++++
 .../dao/repository/impl/IsolationTaskDaoImpl.java  |  32 +--
 .../repository/impl/ProcessDefinitionDaoImpl.java  |   6 +-
 .../repository/impl/ProcessInstanceDaoImpl.java    |  11 +-
 .../dao/repository/impl/TaskInstanceDaoImpl.java   |   8 +
 .../dao/mapper/CoronationTaskMapper.xml            |  69 ++++++
 .../dao/mapper/IsolationTaskMapper.xml             |  32 +--
 .../dao/mapper/TaskInstanceMapper.xml              |   2 +-
 .../src/main/resources/sql/dolphinscheduler_h2.sql |  19 +-
 .../main/resources/sql/dolphinscheduler_mysql.sql  |  25 +-
 .../RefreshCoronationMetadataProcessor.java}       |  14 +-
 ...java => RefreshIsolationMetadataProcessor.java} |  10 +-
 .../server/master/rpc/MasterRPCClient.java         |   7 +-
 .../server/master/rpc/MasterRPCServer.java         |  14 +-
 .../master/runner/FailoverExecuteThread.java       |  11 +-
 .../master/runner/MasterSchedulerBootstrap.java    |  11 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 208 ++++++++++++----
 .../master/service/CoronationMetadataManager.java  | 273 +++++++++++++++++++++
 .../master/service/IsolationMetadataManager.java   | 136 ++++++++++
 .../master/service/IsolationTaskManager.java       |  97 --------
 .../transformer/MasterCommandTransformer.java      |  14 ++
 .../transformer/MasterCommandTransformerImpl.java  |  45 ++++
 .../server/master/utils/WorkflowInstanceUtils.java |  18 ++
 .../master/runner/WorkflowExecuteRunnableTest.java |  46 ++--
 .../remote/command/CommandType.java                |   4 +-
 .../RefreshCoronationMetadataRequest.java}         |   7 +-
 ...t.java => RefreshIsolationMetadataRequest.java} |   8 +-
 .../service/process/ProcessServiceImpl.java        |  17 +-
 .../plugin/task/api/enums/ExecutionStatus.java     |   8 +-
 71 files changed, 1815 insertions(+), 490 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java
new file mode 100644
index 0000000000..b44feb9556
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java
@@ -0,0 +1,38 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface CoronationTaskChecker {
+
+    void checkCanParseCoronationTask(@NonNull User loginUser,
+                                     @NonNull Project project,
+                                     ProcessInstance processInstance,
+                                     @NonNull CoronationTaskExcelImportVO vo) throws ServiceException;
+
+    void checkCanSubmitTaskCoronation(@NonNull User loginUser,
+                                      @NonNull Project project,
+                                      @NonNull ProcessInstance processInstance,
+                                      @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG,
+                                      @NonNull List<CoronationTaskParseVO> voList);
+
+    void checkCanListingTaskCoronation(@NonNull User loginUser,
+                                       long projectCode);
+
+    void checkCanCancelTaskCoronation(@NonNull User loginUser,
+                                      long projectCode,
+                                      @NonNull ProcessInstance processInstance,
+                                      @NonNull CoronationTask coronationTask);
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java
new file mode 100644
index 0000000000..23a6200adb
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java
@@ -0,0 +1,99 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_CANCEL;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_PARSE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_VIEW;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+
+import java.util.List;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class CoronationTaskCheckerImpl implements CoronationTaskChecker {
+
+    @Autowired
+    private ProcessDefinitionDao processDefinitionDao;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Override
+    public void checkCanParseCoronationTask(@NonNull User loginUser,
+                                            @NonNull Project project,
+                                            @NonNull ProcessInstance processInstance,
+                                            @NonNull CoronationTaskExcelImportVO vo) throws ServiceException {
+        ProcessDefinition processDefinition =
+                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+        if (processDefinition.getProjectCode() != project.getCode()) {
+            throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CORONATION_TASK_PARSE);
+    }
+
+    @Override
+    public void checkCanSubmitTaskCoronation(@NonNull User loginUser,
+                                             @NonNull Project project,
+                                             @NonNull ProcessInstance processInstance,
+                                             @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG,
+                                             @NonNull List<CoronationTaskParseVO> voList) throws ServiceException {
+        ProcessDefinition processDefinition =
+                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+
+        if (processDefinition.getProjectCode() != project.getCode()) {
+            throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CORONATION_TASK_PARSE);
+        // todo: check if the vos can submit coronation tasks
+        // if there already exist coronaton task in db or the upstream task contains coronation task, we cannot submit
+
+    }
+
+    @Override
+    public void checkCanListingTaskCoronation(@NonNull User loginUser,
+                                              long projectCode) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, CORONATION_TASK_VIEW);
+    }
+
+    @Override
+    public void checkCanCancelTaskCoronation(@NonNull User loginUser, long projectCode,
+                                             @NonNull ProcessInstance processInstance,
+                                             @NonNull CoronationTask coronationTask) {
+        ProcessDefinition processDefinition =
+                processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+                        .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+                                processInstance.getProcessDefinitionCode()));
+
+        if (processDefinition.getProjectCode() != projectCode) {
+            throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+        }
+
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        projectService.checkProjectAndAuth(loginUser, project, projectCode, CORONATION_TASK_CANCEL);
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
index 9ee19f10be..919464290c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
@@ -15,18 +15,9 @@ public interface IsolationTaskChecker {
                                      ProcessInstance processInstance,
                                      @NonNull List<IsolationTaskExcelParseVO> voList);
 
-    void checkCanOnlineTaskIsolation(@NonNull User loginUser,
-                                     long projectCode,
-                                     @NonNull ProcessInstance processInstance,
-                                     @NonNull IsolationTask isolationTask);
-
     void checkCanListingTaskIsolation(@NonNull User loginUser,
                                       long projectCode);
 
-    void checkCanDeleteTaskIsolation(@NonNull User loginUser,
-                                     long projectCode,
-                                     long isolationId);
-
     void checkCanCancelTaskIsolation(@NonNull User loginUser,
                                      long projectCode,
                                      ProcessInstance processInstance,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
index 99402f7fb6..d5d724db85 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
@@ -9,7 +9,6 @@ import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -26,14 +25,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST;
@@ -79,22 +72,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG);
     }
 
-    @Override
-    public void checkCanOnlineTaskIsolation(@NonNull User loginUser,
-                                            long projectCode,
-                                            ProcessInstance processInstance,
-                                            @NonNull IsolationTask isolationTask) {
-        Project project = projectService.queryByCode(loginUser, projectCode);
-        checkOnlineIsolationTaskAuth(loginUser, project, processInstance);
-
-        if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) {
-            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE);
-        }
-
-        String workflowInstanceName = isolationTask.getWorkflowInstanceName();
-        checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance);
-    }
-
     @Override
     public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) {
         Project project = projectService.queryByCode(loginUser, projectCode);
@@ -102,20 +79,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST);
     }
 
-    @Override
-    public void checkCanDeleteTaskIsolation(@NonNull User loginUser,
-                                            long projectCode,
-                                            long isolationId) {
-        Project project = projectService.queryByCode(loginUser, projectCode);
-        projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE);
-
-        IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
-                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
-        if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) {
-            throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE);
-        }
-    }
-
     @Override
     public void checkCanCancelTaskIsolation(@NonNull User loginUser,
                                             long projectCode,
@@ -127,9 +90,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         Project project = projectService.queryByCode(loginUser, projectCode);
         checkCancelIsolationTaskAuth(loginUser, project, processInstance);
 
-        if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) {
-            throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL);
-        }
     }
 
     private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName,
@@ -173,21 +133,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
         projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT);
     }
 
-    private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
-        if (processInstance == null) {
-            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
-        }
-        ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
-                .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
-                        processInstance.getProcessDefinitionVersion())
-                .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
-        if (processDefinitionLog.getProjectCode() != project.getCode()) {
-            throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
-        }
-
-        projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE);
-    }
-
     private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
         if (processInstance == null) {
             throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
index c8cdd2cb58..6bc0eb475a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.api.constants;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -97,10 +98,12 @@ public class ApiFuncIdentificationConstant {
     public static final String FORCED_SUCCESS = "project:task-instance:force-success";
     public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state";
     public static final String ISOLATION_TASK_LIST = "project:isolation-task:list";
-    public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online";
     public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit";
     public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel";
-    public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete";
+
+    public static final String CORONATION_TASK_VIEW = "project:coronation-task:view";
+    public static final String CORONATION_TASK_PARSE = "project:coronation-task:parse";
+    public static final String CORONATION_TASK_CANCEL = "project:coronation-task:cancel";
     public static final String VIEW_LOG = "project:log:detail";
     public static final String DOWNLOAD_LOG = "project:log:download-log";
     public static final String PROJECT_OVERVIEW = "project:overview:view";
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
new file mode 100644
index 0000000000..80ee7a1107
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
@@ -0,0 +1,91 @@
+package org.apache.dolphinscheduler.api.controller;
+
+import io.swagger.annotations.Api;
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.CoronationTaskService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
+
+import java.util.List;
+
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_CANCEL_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_DELETE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_LISTING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_ONLINE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_PARSE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_SUBMIT_ERROR;
+
+@Api(tags = "ISOLATION_TASK_TAG")
+@RestController
+@RequestMapping("/projects/{projectCode}/coronation-task")
+public class CoronationTaskController {
+
+    @Autowired
+    private CoronationTaskService coronationTaskService;
+
+    @PostMapping(value = "/parse")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_PARSE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<List<CoronationTaskParseVO>> parseCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                   @PathVariable("projectCode") long projectCode,
+                                                                   @RequestBody CoronationTaskParseRequest request) {
+        return Result.success(coronationTaskService.parseCoronationTask(loginUser, projectCode, request));
+    }
+
+    @PostMapping(value = "/submit")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_SUBMIT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> submitCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                             @PathVariable("projectCode") long projectCode,
+                                             @RequestBody CoronationTaskSubmitRequest request) {
+        coronationTaskService.submitCoronationTask(loginUser, projectCode, request);
+        return Result.success(null);
+    }
+
+    @PutMapping(value = "/cancel/{id}")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_CANCEL_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Void> cancelCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                             @PathVariable("projectCode") long projectCode,
+                                             @PathVariable("id") long id) {
+        coronationTaskService.cancelCoronationTask(loginUser, projectCode, id);
+        return Result.success(null);
+    }
+
+    @GetMapping("")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(CORONATION_TASK_LISTING_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<PageInfo<CoronationTaskDTO>> listingCoronationTasks(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                      @PathVariable("projectCode") long projectCode,
+                                                                      @RequestBody CoronationTaskListingRequest request) {
+        return Result.success(coronationTaskService.listingCoronationTasks(loginUser, projectCode, request));
+
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
index f4b445456d..ef9c404113 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
@@ -8,12 +8,12 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.IsolationTaskService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -21,13 +21,11 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestAttribute;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 import springfox.documentation.annotations.ApiIgnore;
 
-import javax.validation.Valid;
-
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR;
 
@@ -50,17 +48,6 @@ public class IsolationTaskController {
         return Result.success(null);
     }
 
-    @PutMapping(value = "/online/{id}")
-    @ResponseStatus(HttpStatus.OK)
-    @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
-    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result<Void> onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                            @PathVariable long projectCode,
-                                            @PathVariable(name = "id") long isolationId) {
-        isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId);
-        return Result.success(null);
-    }
-
     @PutMapping(value = "/cancel/{id}")
     @ResponseStatus(HttpStatus.OK)
     @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
@@ -76,21 +63,15 @@ public class IsolationTaskController {
     @ResponseStatus(HttpStatus.OK)
     @ApiException(ISOLATION_TASK_LISTING_ERROR)
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result<PageInfo<IsolationTask>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                                                @PathVariable long projectCode,
-                                                                @RequestBody @Valid IsolationTaskListingRequest request) {
+    public Result<PageInfo<IsolationTaskListingVO>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                         @PathVariable long projectCode,
+                                                                         @RequestParam(required = false) String workflowInstanceName,
+                                                                         @RequestParam(required = false) String taskName,
+                                                                         @RequestParam Integer pageNo,
+                                                                         @RequestParam Integer pageSize) {
+        IsolationTaskListingRequest request =
+                new IsolationTaskListingRequest(workflowInstanceName, taskName, pageNo, pageSize);
         return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request));
     }
 
-    @DeleteMapping("/{id}")
-    @ResponseStatus(HttpStatus.OK)
-    @ApiException(ISOLATION_TASK_DELETE_ERROR)
-    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result<Void> deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                            @PathVariable long projectCode,
-                                            @PathVariable long id) {
-        isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id);
-        return Result.success(null);
-    }
-
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
new file mode 100644
index 0000000000..7ecded2c1c
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskListingRequest {
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    @NotNull(message = "pageNo cannot be null")
+    private Integer pageNo;
+
+    @NotNull(message = "pageSize cannot be null")
+    private Integer pageSize;
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java
new file mode 100644
index 0000000000..4fa9c384de
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskParseRequest {
+
+    private List<CoronationTaskExcelImportVO> coronationTasks;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
new file mode 100644
index 0000000000..2597a1206e
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskSubmitRequest {
+
+    private List<CoronationTaskParseVO> CoronationTasks;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 602cb477dd..71e462bbf9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -550,6 +550,19 @@ public enum Status {
     ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"),
     ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline",
             "删除隔离任务异常,该隔离任务尚未下线"),
+
+    CORONATION_TASK_PARSE_ERROR(1600000, "Coronation task parse error", "解析加冕任务异常"),
+    CORONATION_TASK_NOT_EXIST(1600001, "Coronation task not exist", "加冕任务不存在"),
+    CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED(1600002,
+            "Coronation task parse error, taskNode name is not validated", "解析加冕任务异常, 任务名不正确"),
+    CORONATION_TASK_SUBMIT_ERROR(1600100, "Coronation task submit error", "提交加冕任务异常"),
+    CORONATION_TASK_ONLINE_ERROR(1600200, "Coronation task online error", "上线加冕任务异常"),
+    CORONATION_TASK_CANCEL_ERROR(1600300, "Coronation task cancel error", "取消加冕任务异常"),
+    CORONATION_TASK_LISTING_ERROR(1600400, "Coronation task listing error", "查询加冕任务异常"),
+    CORONATION_TASK_DELETE_ERROR(1600500, "Coronation task delete error", "删除加冕任务异常"),
+
+    PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1700000,
+            "The workflow instance doesn't belong to the current project", "工作流实例不属于当前项目"),
             ;
 
     private final int code;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
index ef3634cd52..a0d82e4632 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
@@ -27,6 +27,11 @@ public class ApiServerRPCClient {
         sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS);
     }
 
+    public void send(@NonNull Host host,
+                     @NonNull Command rpcCommand) throws RemotingException {
+        client.send(host, rpcCommand);
+    }
+
     public void sendSyncCommand(@NonNull Host host,
                                 @NonNull Command rpcCommand,
                                 long timeoutMills) throws RemotingException, InterruptedException {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java
new file mode 100644
index 0000000000..a45a843d4a
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java
@@ -0,0 +1,30 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface CoronationTaskService {
+
+    List<CoronationTaskParseVO> parseCoronationTask(@NonNull User loginUser,
+                                                    long projectCode,
+                                                    @NonNull CoronationTaskParseRequest request);
+
+    void submitCoronationTask(@NonNull User loginUser,
+                              long projectCode,
+                              @NonNull CoronationTaskSubmitRequest request);
+
+    PageInfo<CoronationTaskDTO> listingCoronationTasks(@NonNull User loginUser,
+                                                       long projectCode,
+                                                       @NonNull CoronationTaskListingRequest request);
+
+    void cancelCoronationTask(@NonNull User loginUser, long projectCode, long id);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
index 7225d19007..16cd6ec225 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
@@ -5,6 +5,7 @@ import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest;
 import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
 import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.User;
 
@@ -14,19 +15,12 @@ public interface IsolationTaskService {
                               long projectCode,
                               @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest);
 
-    void onlineTaskIsolation(@NonNull User loginUser,
-                             long projectCode,
-                             long isolationTaskId);
-
     void cancelTaskIsolation(@NonNull User loginUser,
                              long projectCode,
                              long isolationId);
 
-    PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
-                                                 long projectCode,
-                                                 @NonNull IsolationTaskListingRequest request);
+    PageInfo<IsolationTaskListingVO> listingTaskIsolation(@NonNull User loginUser,
+                                                          long projectCode,
+                                                          @NonNull IsolationTaskListingRequest request);
 
-    void deleteTaskIsolation(@NonNull User loginUser,
-                             long projectCode,
-                             long id);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
new file mode 100644
index 0000000000..a6c0691c8b
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
@@ -0,0 +1,205 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.api.checker.CoronationTaskChecker;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient;
+import org.apache.dolphinscheduler.api.service.CoronationTaskService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.dto.TaskSimpleInfoDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class CoronationTaskServiceImpl implements CoronationTaskService {
+
+    @Autowired
+    private CoronationTaskChecker coronationTaskChecker;
+
+    @Autowired
+    private CoronationTaskDao coronationTaskDao;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private WorkflowDAGService workflowDAGService;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private ApiServerRPCClient apiServerRPCClient;
+
+    @Override
+    public List<CoronationTaskParseVO> parseCoronationTask(@NonNull User loginUser,
+                                                           long projectCode,
+                                                           @NonNull CoronationTaskParseRequest request) {
+        List<CoronationTaskExcelImportVO> coronationTasks = request.getCoronationTasks();
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        return coronationTasks.stream()
+                .map(vo -> {
+                    ProcessInstance workflowInstance =
+                            processInstanceDao.queryProcessInstanceById(vo.getWorkflowInstanceId())
+                                    .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+                    coronationTaskChecker.checkCanParseCoronationTask(loginUser, project, workflowInstance, vo);
+                    DAG<String, TaskNode, TaskNodeRelation> workflowDAG =
+                            workflowDAGService.getWorkflowDAG(workflowInstance.getProcessDefinitionCode(),
+                                    workflowInstance.getProcessDefinitionVersion());
+                    if (!vo.getTaskName().equals(workflowDAG.getNode(Long.toString(vo.getTaskCode())).getName())) {
+                        throw new ServiceException(Status.CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED);
+                    }
+                    List<TaskSimpleInfoDTO> previousTaskNodeDTO =
+                            workflowDAG.getPreviousNodes(Long.toString(vo.getTaskCode()))
+                                    .stream()
+                                    .map(previousNodeCode -> {
+                                        TaskNode node = workflowDAG.getNode(previousNodeCode);
+                                        return new TaskSimpleInfoDTO(node.getName(), node.getCode());
+                                    }).collect(Collectors.toList());
+                    return CoronationTaskParseVO.builder()
+                            .workflowInstanceId(workflowInstance.getId())
+                            .workflowInstanceName(workflowInstance.getName())
+                            .taskNode(vo.getTaskName())
+                            .taskCode(vo.getTaskCode())
+                            .upstreamTasks(previousTaskNodeDTO)
+                            .build();
+                }).collect(Collectors.toList());
+    }
+
+    @Override
+    public void submitCoronationTask(@NonNull User loginUser, long projectCode,
+                                     @NonNull CoronationTaskSubmitRequest request) {
+        Project project = projectService.queryByCode(loginUser, projectCode);
+        Map<Integer, List<CoronationTaskParseVO>> coronationTaskMap = request.getCoronationTasks().stream()
+                .collect(Collectors.groupingBy(CoronationTaskParseVO::getWorkflowInstanceId));
+
+        List<CoronationTask> needToInsertIntoDB = new ArrayList<>();
+
+        for (Map.Entry<Integer, List<CoronationTaskParseVO>> entry : coronationTaskMap.entrySet()) {
+            Integer workflowInstanceId = entry.getKey();
+            List<CoronationTaskParseVO> vos = entry.getValue();
+
+            ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                    .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+            DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(
+                    workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion());
+            coronationTaskChecker.checkCanSubmitTaskCoronation(loginUser, project, workflowInstance, workflowDAG, vos);
+
+            List<CoronationTask> coronationTasks = vos.stream()
+                    .map(vo -> {
+                        Set<String> previousNodes = workflowDAG.getPreviousNodes(vo.getTaskCode().toString());
+                        Set<String> selectNodes = vo.getUpstreamTasks()
+                                .stream()
+                                .map(taskNode -> Long.toString(taskNode.getTaskCode()))
+                                .collect(Collectors.toSet());
+                        // The upstream node hasn't been selected will be set to forbidden execute
+                        List<TaskSimpleInfoDTO> needToForbiddenTaskCodes =
+                                CollectionUtils.subtract(previousNodes, selectNodes)
+                                        .stream()
+                                        .map(taskNode -> {
+                                            TaskNode node = workflowDAG.getNode(taskNode);
+                                            return new TaskSimpleInfoDTO(node.getName(), node.getCode());
+                                        }).collect(Collectors.toList());
+
+                        return CoronationTask.builder()
+                                .workflowInstanceId(vo.getWorkflowInstanceId())
+                                .workflowInstanceName(vo.getWorkflowInstanceName())
+                                .taskName(vo.getTaskNode())
+                                .taskCode(vo.getTaskCode())
+                                .forbiddenUpstreamTasks(JSONUtils.toJsonString(needToForbiddenTaskCodes))
+                                .build();
+                    }).collect(Collectors.toList());
+            needToInsertIntoDB.addAll(coronationTasks);
+        }
+        coronationTaskDao.batchInsert(needToInsertIntoDB);
+        sendSyncCoronationTasksRequestToMaster();
+    }
+
+    @Override
+    public void cancelCoronationTask(@NonNull User loginUser, long projectCode, long id) {
+        // do checker
+        CoronationTask coronationTask = coronationTaskDao.queryCoronationTaskById(id)
+                .orElseThrow(() -> new ServiceException(Status.CORONATION_TASK_NOT_EXIST));
+        ProcessInstance workflowInstance =
+                processInstanceDao.queryProcessInstanceById(coronationTask.getWorkflowInstanceId())
+                        .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+        coronationTaskChecker.checkCanCancelTaskCoronation(loginUser, projectCode, workflowInstance, coronationTask);
+        coronationTaskDao.deleteById(id);
+        sendSyncCoronationTasksRequestToMaster();
+    }
+
+    @Override
+    public PageInfo<CoronationTaskDTO> listingCoronationTasks(@NonNull User loginUser, long projectCode,
+                                                              @NonNull CoronationTaskListingRequest request) {
+        coronationTaskChecker.checkCanListingTaskCoronation(loginUser, projectCode);
+
+        Integer pageNo = request.getPageNo();
+        Integer pageSize = request.getPageSize();
+
+        IPage<CoronationTask> iPage = coronationTaskDao.pageQueryCoronationTask(
+                request.getWorkflowInstanceName(),
+                request.getTaskName(),
+                pageNo,
+                pageSize);
+
+        PageInfo<CoronationTaskDTO> pageInfo = new PageInfo<>(pageNo, pageSize);
+        pageInfo.setTotal((int) iPage.getTotal());
+        pageInfo.setTotalList(iPage.getRecords().stream().map(CoronationTaskDTO::new).collect(Collectors.toList()));
+
+        return pageInfo;
+    }
+
+    private void sendSyncCoronationTasksRequestToMaster() {
+        List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+        if (CollectionUtils.isEmpty(masters)) {
+            return;
+        }
+        Command command = new RefreshCoronationMetadataRequest().convert2Command();
+        for (Server master : masters) {
+            try {
+                apiServerRPCClient.send(new Host(master.getHost(), master.getPort()), command);
+            } catch (RemotingException e) {
+                log.error("Send RefreshCoronationTask request to master error, master: {}", master, e);
+            }
+        }
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
index 4f6ff908b1..f3bdcf7600 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
@@ -16,12 +16,12 @@ import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
 import org.apache.dolphinscheduler.api.transformer.CommandTransformer;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationMetadataRequest;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -49,7 +49,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR;
 
 @Slf4j
 @Service
@@ -83,7 +82,6 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
     private TaskInstanceDao taskInstanceDao;
 
     @Override
-    @Transactional
     public void submitTaskIsolations(@NonNull User loginUser,
                                      long projectCode,
                                      @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
@@ -100,7 +98,8 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
         for (Map.Entry<Integer, List<IsolationTaskExcelParseVO>> entry : workflow2VoMap.entrySet()) {
             Integer workflowInstanceId = entry.getKey();
             List<IsolationTaskExcelParseVO> vos = entry.getValue();
-            ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+            ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                    .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
             isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos);
 
             List<IsolationTask> isolationTasks = entry.getValue().stream().map(vo -> {
@@ -109,36 +108,13 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                         .workflowInstanceName(vo.getWorkflowInstanceName())
                         .taskName(vo.getTaskName())
                         .taskCode(vo.getTaskCode())
-                        .status(IsolationTaskStatus.ONLINE.getCode())
                         .build();
             }).collect(Collectors.toList());
             needToInsertIntoDB.addAll(isolationTasks);
             needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks));
         }
         isolationTaskDao.batchInsert(needToInsertIntoDB);
-        // we split here to avoid rollback RPC request
-        try {
-            refreshIsolationTasks();
-        } catch (Exception ex) {
-            throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR);
-        }
-    }
-
-    @Override
-    @Transactional
-    public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) {
-        IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId)
-                .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
-        ProcessInstance processInstance =
-                processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId());
-
-        isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
-        isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE);
-        try {
-            refreshIsolationTasks();
-        } catch (Exception ex) {
-            throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR);
-        }
+        sendIsolationTaskRefreshRequestToMaster();
     }
 
     @Override
@@ -150,21 +126,18 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                 .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
 
         Integer workflowInstanceId = isolationTask.getWorkflowInstanceId();
-        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+        ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
         isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
-        isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE);
-        insertRecoveryCommandIfNeeded(processInstance, isolationTask);
-        try {
-            refreshIsolationTasks();
-        } catch (RemotingException | InterruptedException e) {
-            throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR);
-        }
+        isolationTaskDao.deleteById(isolationTask.getId());
+        insertRecoveryCommandIfNeeded(processInstance);
+        sendIsolationTaskRefreshRequestToMaster();
     }
 
     @Override
-    public PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
-                                                        long projectCode,
-                                                        @NonNull IsolationTaskListingRequest request) {
+    public PageInfo<IsolationTaskListingVO> listingTaskIsolation(@NonNull User loginUser,
+                                                                 long projectCode,
+                                                                 @NonNull IsolationTaskListingRequest request) {
         isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode);
 
         Integer pageNo = request.getPageNo();
@@ -175,46 +148,48 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                 request.getTaskName(),
                 pageNo,
                 pageSize);
-
-        PageInfo<IsolationTask> pageInfo = new PageInfo<>(pageNo, pageSize);
+        List<IsolationTaskListingVO> isolationTaskListingVOList = iPage.getRecords()
+                .stream()
+                .map(isolationTask -> {
+                    return IsolationTaskListingVO.builder()
+                            .id(isolationTask.getId())
+                            .workflowInstanceId(isolationTask.getWorkflowInstanceId())
+                            .workflowInstanceName(isolationTask.getWorkflowInstanceName())
+                            // todo: set task status
+                            .taskName(isolationTask.getTaskName())
+                            .taskCode(isolationTask.getTaskCode())
+                            .createTime(isolationTask.getCreateTime())
+                            .updateTime(isolationTask.getUpdateTime())
+                            .build();
+                }).collect(Collectors.toList());
+        PageInfo<IsolationTaskListingVO> pageInfo = new PageInfo<>(pageNo, pageSize);
         pageInfo.setTotal((int) iPage.getTotal());
-        pageInfo.setTotalList(iPage.getRecords());
+        pageInfo.setTotalList(isolationTaskListingVOList);
         return pageInfo;
     }
 
-    @Override
-    public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) {
-        isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id);
-        int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE);
-        if (deleteNum <= 0) {
-            throw new ServiceException(ISOLATION_TASK_NOT_EXIST);
-        }
-    }
-
-    private void refreshIsolationTasks() throws RemotingException, InterruptedException {
+    private void sendIsolationTaskRefreshRequestToMaster() {
         List<Server> masters = registryClient.getServerList(NodeType.MASTER);
         if (CollectionUtils.isEmpty(masters)) {
             return;
         }
 
         org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest =
-                new RefreshIsolationTaskRequest().convert2Command();
+                new RefreshIsolationMetadataRequest().convert2Command();
         for (Server master : masters) {
             try {
-                apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
-                        refreshIsolationRequest);
-            } catch (RemotingException | InterruptedException e) {
+                apiServerRPCClient.send(new Host(master.getHost(), master.getPort()), refreshIsolationRequest);
+            } catch (RemotingException e) {
                 log.error("Send RefreshIsolationTask request to master error, master: {}", master, e);
-                throw e;
             }
         }
     }
 
-    private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance,
-                                               @NonNull IsolationTask isolationTask) {
+    private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance) {
         if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) {
             return;
         }
+        log.info("The current workflow instance is in PAUSE_BY_ISOLATION status, will insert a recovery command");
         int workflowInstanceId = processInstance.getId();
         // find the isolationTaskInstanceIds need to recovery
         // find the sub node is in pause or kill
@@ -225,7 +200,7 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
         List<TaskInstance> taskInstances =
                 taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId);
         Set<String> onlineIsolationTaskCodes =
-                isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE)
+                isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId)
                         .stream()
                         .map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode()))
                         .collect(Collectors.toSet());
@@ -236,6 +211,7 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
                         onlineIsolationTaskCodes, workflowDAG))
                 .collect(Collectors.toList());
         if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) {
+            log.error("The current workflow instance has no task instance can recovery");
             return;
         }
         // find if this taskInstance still exist pre isolationTasks
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 9d1dc166a2..6c5b49b94d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -242,7 +242,8 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
                     .entrySet()) {
                 Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey();
                 List<Integer> needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue();
-                ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+                ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+                        .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
                 workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance);
                 Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance,
                         needToCleanStateTaskInstanceIds);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java
new file mode 100644
index 0000000000..dc21207513
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskExcelImportVO {
+
+    @NotNull(message = "Workflow instance id cannot be null")
+    private Integer workflowInstanceId;
+
+    @NotNull(message = "Workflow instance name cannot be null")
+    private String workflowInstanceName;
+
+    @NotNull(message = "Task code cannot be null")
+    private Long taskCode;
+
+    @NotNull(message = "Task name cannot be null")
+    private String taskName;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java
new file mode 100644
index 0000000000..e52379b2cd
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java
@@ -0,0 +1,34 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.dao.dto.TaskSimpleInfoDTO;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskParseVO {
+
+    @NotNull(message = "workflow instance cannot be null")
+    private String workflowInstanceName;
+
+    @NotNull(message = "workflow instance id cannot be null")
+    private Integer workflowInstanceId;
+
+    @NotNull(message = "taskNode cannot be null")
+    private String taskNode;
+
+    @NotNull(message = "taskCode cannot be null")
+    private Long taskCode;
+
+    @NotEmpty(message = "upstream task nodes cannot be null")
+    private List<TaskSimpleInfoDTO> upstreamTasks;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
similarity index 54%
copy from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
index 8a852aace5..94b69cc9fb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
@@ -1,8 +1,5 @@
-package org.apache.dolphinscheduler.dao.entity;
+package org.apache.dolphinscheduler.api.vo;
 
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -14,10 +11,8 @@ import java.util.Date;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@TableName("t_ds_isolation_task")
-public class IsolationTask {
+public class IsolationTaskListingVO {
 
-    @TableId(value = "id", type = IdType.AUTO)
     private Long id;
 
     private int workflowInstanceId;
@@ -28,10 +23,9 @@ public class IsolationTask {
 
     private long taskCode;
 
-    private int status;
+    private String taskStatus;
 
     private Date createTime;
 
     private Date updateTime;
-
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index efffbc3842..a851b6dc5d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -341,6 +341,9 @@ public final class Constants {
     public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds";
     public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds";
 
+    public static final String CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS =
+            "RecoveryPausedByCoronationTaskInstanceIds";
+
     public static final String CMD_PARAM_START_PARAMS = "StartParams";
 
     public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 62d538a58b..ca30317808 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -57,6 +57,8 @@ public enum CommandType {
 
     RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"),
 
+    RECOVERY_FROM_CORONATION_PAUSE_TASKS(14, "recovery from coronation pause tasks"),
+
     ;
 
     CommandType(int code, String descp) {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index d31c92780c..255c4f833f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -72,6 +72,7 @@ public class JSONUtils {
             .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
             .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
             .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
             .setTimeZone(TimeZone.getDefault())
             .setDateFormat(new SimpleDateFormat(Constants.YYYY_MM_DD_HH_MM_SS));
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java
new file mode 100644
index 0000000000..df22a2fbcf
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java
@@ -0,0 +1,50 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+
+import java.util.Date;
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskDTO {
+
+    private Long id;
+
+    private int workflowInstanceId;
+
+    private String workflowInstanceName;
+
+    private String taskName;
+
+    private long taskCode;
+
+    private List<TaskSimpleInfoDTO> forbiddenUpstreamTasks;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+    public CoronationTaskDTO(@NonNull CoronationTask coronationTask) {
+        this.id = coronationTask.getId();
+        this.workflowInstanceId = coronationTask.getWorkflowInstanceId();
+        this.workflowInstanceName = coronationTask.getWorkflowInstanceName();
+        this.taskName = coronationTask.getTaskName();
+        this.taskCode = coronationTask.getTaskCode();
+        this.forbiddenUpstreamTasks = JSONUtils.parseObject(coronationTask.getForbiddenUpstreamTasks(),
+                new TypeReference<List<TaskSimpleInfoDTO>>() {
+                });
+        this.createTime = coronationTask.getCreateTime();
+        this.updateTime = coronationTask.getUpdateTime();
+    }
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
deleted file mode 100644
index dcc93cc64b..0000000000
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.dolphinscheduler.dao.dto;
-
-public enum IsolationTaskStatus {
-
-    ONLINE(0),
-    OFFLINE(1),
-    ;
-
-    private final int code;
-
-    IsolationTaskStatus(int code) {
-        this.code = code;
-    }
-
-    public int getCode() {
-        return code;
-    }
-
-    public static IsolationTaskStatus of(int code) {
-        for (IsolationTaskStatus value : IsolationTaskStatus.values()) {
-            if (value.getCode() == code) {
-                return value;
-            }
-        }
-        throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code));
-    }
-}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java
new file mode 100644
index 0000000000..ea454ce543
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java
@@ -0,0 +1,20 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class TaskRuntimeContext {
+
+    private boolean hasBeenIsolated;
+    private boolean isIsolationTask;
+
+    private boolean hasBeenCoronatted;
+    private boolean isCoronationTask;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
new file mode 100644
index 0000000000..9fe05ca9ca
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskSimpleInfoDTO {
+
+    private String taskName;
+    private long taskCode;
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
similarity index 86%
copy from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
copy to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
index 8a852aace5..a7b72203bb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
@@ -14,8 +14,8 @@ import java.util.Date;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@TableName("t_ds_isolation_task")
-public class IsolationTask {
+@TableName("t_ds_coronation_task")
+public class CoronationTask {
 
     @TableId(value = "id", type = IdType.AUTO)
     private Long id;
@@ -28,7 +28,7 @@ public class IsolationTask {
 
     private long taskCode;
 
-    private int status;
+    private String forbiddenUpstreamTasks;
 
     private Date createTime;
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
index 8a852aace5..a7aaf4b359 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
@@ -28,8 +28,6 @@ public class IsolationTask {
 
     private long taskCode;
 
-    private int status;
-
     private Date createTime;
 
     private Date updateTime;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 3e1d8cef0f..23ad1de3f0 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -276,6 +276,8 @@ public class TaskInstance implements Serializable {
      */
     private int taskGroupId;
 
+    private String runtimeContext;
+
     public void init(String host, Date startTime, String executePath) {
         this.host = host;
         this.startTime = startTime;
@@ -758,4 +760,12 @@ public class TaskInstance implements Serializable {
     public void setTaskGroupPriority(int taskGroupPriority) {
         this.taskGroupPriority = taskGroupPriority;
     }
+
+    public String getRuntimeContext() {
+        return runtimeContext;
+    }
+
+    public void setRuntimeContext(String runtimeContext) {
+        this.runtimeContext = runtimeContext;
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index c957260a3c..fd1c8d7204 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -57,4 +57,7 @@ public interface CommandMapper extends BaseMapper<Command> {
                                          @Param("thisMasterSlot") int thisMasterSlot);
 
     void batchInsertCommand(List<Command> commands);
+
+    List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(@Param("workflowInstanceId") long workflowInstanceId,
+                                                                     @Param("command_type") int commandType);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
new file mode 100644
index 0000000000..d5829de216
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.dao.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface CoronationTaskMapper extends BaseMapper<CoronationTask> {
+
+    IPage<CoronationTask> pageQueryCoronationTask(Page<IsolationTask> page,
+                                                  @Param("workflowInstanceName") String workflowInstanceName,
+                                                  @Param("taskName") String taskName);
+
+    void batchInsert(@Param("coronationTasks") List<CoronationTask> coronationTasks);
+
+    List<CoronationTask> queryAllCoronationTasks();
+
+    int queryAllCoronationTaskNumber();
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
index babb00c5bb..3bd1045682 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -10,23 +10,17 @@ import java.util.List;
 
 public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
 
-    IPage<IsolationTask> pageQuery(@Param("workflowInstanceName") String workflowInstanceName,
-                                   @Param("taskName") String taskName,
-                                   IPage<IsolationTask> page);
+    IPage<IsolationTask> pageQuery(IPage<IsolationTask> page,
+                                   @Param("workflowInstanceName") String workflowInstanceName,
+                                   @Param("taskName") String taskName);
 
     List<IsolationTask> queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId,
                                          @NonNull @Param("taskCodes") List<Long> taskCodes);
 
-    void updateIsolationTaskStatus(@Param("id") long isolationTaskId,
-                                   @Param("status") int status);
-
-    int deleteByIdAndStatus(@Param("id") long isolationTaskId,
-                            @Param("status") int status);
-
-    List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId,
-                                                  @Param("status") int status);
+    List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
 
     void batchInsert(@Param("isolationTasks") List<IsolationTask> isolationTasks);
 
-    List<IsolationTask> queryByStatus(@Param("status") int code);
+    List<IsolationTask> queryAllIsolationTask();
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index a2ec345fa0..cf97f42712 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -258,4 +258,6 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
 
     ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode,
                                            @Param("state") int state, @Param("id") int id);
+
+    List<ProcessInstance> queryByStatus(@Param("state") int code);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index cf282e1359..23a47177e9 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -39,6 +39,10 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
     List<TaskInstance> findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId,
                                                     @Param("flag") Flag flag);
 
+    List<TaskInstance> findValidTaskListByProcessIdAndTaskStatus(@Param("processInstanceId") Integer processInstanceId,
+                                                                 @Param("status") int status,
+                                                                 @Param("flag") int flag);
+
     List<TaskInstance> queryByHostAndStatus(@Param("host") String host,
                                             @Param("states") int[] stateArray);
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
index 278b922082..17445b8e6f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
@@ -7,4 +7,6 @@ import java.util.List;
 public interface CommandDao {
 
     void batchInsertCommand(List<Command> commands);
+
+    List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
new file mode 100644
index 0000000000..1893e8ffd6
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
@@ -0,0 +1,28 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface CoronationTaskDao {
+
+    IPage<CoronationTask> pageQueryCoronationTask(String workflowInstanceName,
+                                                  String taskName,
+                                                  @NonNull Integer pageNo,
+                                                  @NonNull Integer pageSize);
+
+    List<CoronationTaskDTO> queryAllCoronationTasks();
+
+    Optional<CoronationTask> queryCoronationTaskById(long id);
+
+    int deleteById(long id);
+
+    void batchInsert(List<CoronationTask> coronationTasks);
+
+    int queryAllCoronationTaskNumber();
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
index de16a92be0..b38b30569a 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -2,12 +2,10 @@ package org.apache.dolphinscheduler.dao.repository;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import lombok.NonNull;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 
 public interface IsolationTaskDao {
 
@@ -19,21 +17,17 @@ public interface IsolationTaskDao {
                                                 int pageNumber,
                                                 int pageSize);
 
+    List<IsolationTask> queryAllIsolationTask();
+
     List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes);
 
-    List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus);
+    List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId);
 
     Optional<IsolationTask> queryById(long isolationTaskId);
 
-    List<IsolationTask> queryByIds(List<Long> isolationTaskIds);
-
-    List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus);
-
-    int deleteByIdAndStatus(long id, IsolationTaskStatus status);
+    int deleteById(long id);
 
     void insert(IsolationTask isolationTaskDTO);
 
-    void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus);
-
     void batchInsert(List<IsolationTask> isolationTasks);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
index bd6e140951..7cde663fa7 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
@@ -3,7 +3,9 @@ package org.apache.dolphinscheduler.dao.repository;
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 
+import java.util.Optional;
+
 public interface ProcessDefinitionDao {
 
-    ProcessDefinition queryProcessDefinitionByCode(@NonNull Long processDefinitionCode);
+    Optional<ProcessDefinition> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index e789101783..a069340c96 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -19,14 +19,16 @@ package org.apache.dolphinscheduler.dao.repository;
 
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 import java.util.List;
+import java.util.Optional;
 
 public interface ProcessInstanceDao {
 
     List<ProcessInstance> queryProcessInstanceByIds(List<Integer> processInstanceIds);
 
-    ProcessInstance queryProcessInstanceById(@NonNull Integer processInstanceId);
+    Optional<ProcessInstance> queryProcessInstanceById(@NonNull Integer processInstanceId);
 
     int insertProcessInstance(ProcessInstance processInstance);
 
@@ -39,4 +41,5 @@ public interface ProcessInstanceDao {
      */
     int upsertProcessInstance(ProcessInstance processInstance);
 
+    List<ProcessInstance> queryProcessInstanceByStatus(@NonNull ExecutionStatus pauseByIsolation);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index 3918ad8386..971258499d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository;
 import lombok.NonNull;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.exception.RepositoryException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 import java.util.List;
 
@@ -12,6 +13,9 @@ public interface TaskInstanceDao {
 
     List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId);
 
+    List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(@NonNull Integer workflowInstanceId,
+                                                                               @NonNull ExecutionStatus status);
+
     /**
      * Update the taskInstance, if update failed will throw exception.
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
index 79297445d1..2365b5f2bb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
@@ -1,6 +1,7 @@
 package org.apache.dolphinscheduler.dao.repository.impl;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
 import org.apache.dolphinscheduler.dao.repository.CommandDao;
@@ -22,4 +23,10 @@ public class CommandDaoImpl implements CommandDao {
         }
         commandMapper.batchInsertCommand(commands);
     }
+
+    @Override
+    public List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId) {
+        return commandMapper.queryRecoveryCoronationCommandByWorkflowInstanceId(workflowInstanceId,
+                CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS.getCode());
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
new file mode 100644
index 0000000000..893b28fbbb
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
@@ -0,0 +1,65 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.mapper.CoronationTaskMapper;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Repository
+public class CoronationTaskDaoImpl implements CoronationTaskDao {
+
+    @Autowired
+    private CoronationTaskMapper coronationTaskMapper;
+
+    @Override
+    public IPage<CoronationTask> pageQueryCoronationTask(String workflowInstanceName, String taskName,
+                                                         @NonNull Integer pageNo,
+                                                         @NonNull Integer pageSize) {
+        Page<IsolationTask> page = new Page<>(pageNo, pageSize);
+        return coronationTaskMapper.pageQueryCoronationTask(page, workflowInstanceName, taskName);
+    }
+
+    @Override
+    public List<CoronationTaskDTO> queryAllCoronationTasks() {
+        return coronationTaskMapper.queryAllCoronationTasks()
+                .stream()
+                .map(CoronationTaskDTO::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<CoronationTask> queryCoronationTaskById(long id) {
+        return Optional.ofNullable(coronationTaskMapper.selectById(id));
+    }
+
+    @Override
+    public int deleteById(long id) {
+        return coronationTaskMapper.deleteById(id);
+    }
+
+    @Override
+    public void batchInsert(List<CoronationTask> coronationTasks) {
+        if (CollectionUtils.isEmpty(coronationTasks)) {
+            return;
+        }
+        coronationTaskMapper.batchInsert(coronationTasks);
+    }
+
+    @Override
+    public int queryAllCoronationTaskNumber() {
+        return coronationTaskMapper.queryAllCoronationTaskNumber();
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
index effab3aa16..0608adba0b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
 import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper;
 import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
@@ -29,7 +28,12 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
                                                        int pageNumber,
                                                        int pageSize) {
         Page<IsolationTask> page = new Page<>(pageNumber, pageSize);
-        return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page);
+        return isolationTaskMapper.pageQuery(page, workflowInstanceName, taskName);
+    }
+
+    @Override
+    public List<IsolationTask> queryAllIsolationTask() {
+        return isolationTaskMapper.queryAllIsolationTask();
     }
 
     @Override
@@ -41,9 +45,8 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
     }
 
     @Override
-    public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId,
-                                                         IsolationTaskStatus isolationTaskStatus) {
-        return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode());
+    public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId) {
+        return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId);
     }
 
     @Override
@@ -52,18 +55,8 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
     }
 
     @Override
-    public List<IsolationTask> queryByIds(List<Long> isolationTaskIds) {
-        return isolationTaskMapper.selectBatchIds(isolationTaskIds);
-    }
-
-    @Override
-    public List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) {
-        return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode());
-    }
-
-    @Override
-    public int deleteByIdAndStatus(long id, IsolationTaskStatus status) {
-        return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode());
+    public int deleteById(long id) {
+        return isolationTaskMapper.deleteById(id);
     }
 
     @Override
@@ -71,11 +64,6 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
         isolationTaskMapper.insert(isolationTask);
     }
 
-    @Override
-    public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) {
-        isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode());
-    }
-
     @Override
     public void batchInsert(List<IsolationTask> isolationTasks) {
         if (CollectionUtils.isEmpty(isolationTasks)) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
index 72dce0b2c3..637b10ad5d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
@@ -7,6 +7,8 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
+import java.util.Optional;
+
 @Repository
 public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
 
@@ -14,7 +16,7 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
     private ProcessDefinitionMapper processDefinitionMapper;
 
     @Override
-    public ProcessDefinition queryProcessDefinitionByCode(@NonNull Long processDefinitionCode) {
-        return processDefinitionMapper.queryByCode(processDefinitionCode);
+    public Optional<ProcessDefinition> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode) {
+        return Optional.ofNullable(processDefinitionMapper.queryByCode(processDefinitionCode));
     }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 84ab280c7f..338566e67e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -23,11 +23,13 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 @Slf4j
 @Repository
@@ -45,8 +47,8 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
     }
 
     @Override
-    public ProcessInstance queryProcessInstanceById(@NonNull Integer processInstanceId) {
-        return processInstanceMapper.selectById(processInstanceId);
+    public Optional<ProcessInstance> queryProcessInstanceById(@NonNull Integer processInstanceId) {
+        return Optional.ofNullable(processInstanceMapper.selectById(processInstanceId));
     }
 
     @Override
@@ -67,4 +69,9 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
             return insertProcessInstance(processInstance);
         }
     }
+
+    @Override
+    public List<ProcessInstance> queryProcessInstanceByStatus(@NonNull ExecutionStatus executionStatus) {
+        return processInstanceMapper.queryByStatus(executionStatus.getCode());
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 0512980b10..9b07088810 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -8,6 +8,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.exception.RepositoryException;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
@@ -34,6 +35,13 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
         return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES);
     }
 
+    @Override
+    public List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(@NonNull Integer workflowInstanceId,
+                                                                                      @NonNull ExecutionStatus status) {
+        return taskInstanceMapper.findValidTaskListByProcessIdAndTaskStatus(workflowInstanceId, status.getCode(),
+                Flag.YES.getCode());
+    }
+
     @Override
     public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException {
         try {
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
new file mode 100644
index 0000000000..0967fa71ec
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.CoronationTaskMapper">
+    <sql id="baseSql">
+        id
+        ,workflow_instance_id, workflow_instance_name, task_name, task_code, forbidden_upstream_tasks, create_time, update_time
+    </sql>
+
+    <select id="pageQueryCoronationTask" resultType="org.apache.dolphinscheduler.dao.entity.CoronationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_coronation_task
+        where 1 = 1
+        <if test="workflowInstanceName != null and workflowInstanceName != ''">
+            and workflow_instance_name = #{workflowInstanceName}
+        </if>
+        <if test="taskName != null and taskName != ''">
+            and task_name = #{taskName}
+        </if>
+    </select>
+
+    <select id="queryAllCoronationTasks" resultType="org.apache.dolphinscheduler.dao.entity.CoronationTask">
+        select
+        <include refid="baseSql"/>
+        from t_ds_coronation_task
+    </select>
+
+    <select id="queryAllCoronationTaskNumber" resultType="java.lang.Integer">
+        select count(1) from t_ds_coronation_task
+    </select>
+
+    <insert id="batchInsert">
+        insert into t_ds_coronation_task (
+        workflow_instance_id,
+        workflow_instance_name,
+        task_name,
+        task_code,
+        forbidden_upstream_tasks
+        )
+        values
+        <foreach collection="coronationTasks" item="coronationTask" separator=",">
+            (
+            #{coronationTask.workflowInstanceId},
+            #{coronationTask.workflowInstanceName},
+            #{coronationTask.taskName},
+            #{coronationTask.taskCode},
+            #{coronationTask.forbiddenUpstreamTasks}
+            )
+        </foreach>
+    </insert>
+
+</mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
index 4a7100010e..390591be26 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -20,18 +20,18 @@
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper">
     <sql id="baseSql">
         id
-        ,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time
+        ,workflow_instance_id, workflow_instance_name, task_name, task_code, create_time, update_time
     </sql>
 
-    <select id="pageQuery" resultType="string">
+    <select id="pageQuery" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
         select
         <include refid="baseSql"/>
         from t_ds_isolation_task
         where 1 = 1
-        <if test="workflowInstanceName != null">
+        <if test="workflowInstanceName != null and workflowInstanceName != ''">
             and workflow_instance_name = #{workflowInstanceName}
         </if>
-        <if test="taskName != null">
+        <if test="taskName != null and taskName != ''">
             and task_name = #{taskName}
         </if>
     </select>
@@ -51,29 +51,21 @@
         select
         <include refid="baseSql"/>
         from t_ds_isolation_task
-        where workflow_instance_id = #{workflowInstanceId} and status = #{status}
+        where workflow_instance_id = #{workflowInstanceId}
     </select>
 
-    <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+    <select id="queryAllIsolationTask" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
         select
         <include refid="baseSql"/>
         from t_ds_isolation_task
-        where  status = #{status}
     </select>
 
-    <update id="updateIsolationTaskStatus">
-        update t_ds_isolation_task
-        set status = #{status}
-        where id = #{id}
-    </update>
-
     <insert id="batchInsert">
         insert into t_ds_isolation_task (
         workflow_instance_id,
         workflow_instance_name,
         task_name,
-        task_code,
-        status
+        task_code
         )
         values
         <foreach collection="isolationTasks" item="isolationTask" separator=",">
@@ -81,17 +73,9 @@
             #{isolationTask.workflowInstanceId},
             #{isolationTask.workflowInstanceName},
             #{isolationTask.taskName},
-            #{isolationTask.taskCode},
-            #{isolationTask.status}
+            #{isolationTask.taskCode}
             )
         </foreach>
     </insert>
 
-    <delete id="deleteByIdAndStatus">
-        delete
-        from t_ds_isolation_task
-        where id = #{id}
-          and status = #{status}
-    </delete>
-
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 336b966487..7c63a84e2c 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,7 +22,7 @@
         id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
         start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
         flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
-        first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id
+        first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, runtime_context
     </sql>
     <sql id="baseSqlV2">
         ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 8f6df6738c..134a260a45 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -864,6 +864,7 @@ CREATE TABLE t_ds_task_instance
     task_group_id           int(11) DEFAULT NULL,
     var_pool                longtext,
     dry_run                 int NULL DEFAULT 0,
+    runtime_context         text,
     PRIMARY KEY (id),
     FOREIGN KEY (process_instance_id) REFERENCES t_ds_process_instance (id) ON DELETE CASCADE
 );
@@ -1989,8 +1990,24 @@ CREATE TABLE `t_ds_isolation_task`
     `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
     `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
     `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
-    `status`                 tinyint(4) DEFAULT '0',
     `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
     `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
     PRIMARY KEY (`id`)
 );
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_coronation_task`;
+CREATE TABLE `t_ds_coronation_task`
+(
+    `id`                       bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`     bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name`   varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`                varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`                bigint       NOT NULL COMMENT 'isolation task code',
+    `forbidden_upstream_tasks` text COMMENT 'The task in forbidden_upstream_tasks will be in forbidden execute list',
+    `create_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`)
+);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index e5b69db827..d202084ec2 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -344,7 +344,8 @@ CREATE TABLE `t_ds_command` (
   `environment_code`          bigint(20) DEFAULT '-1' COMMENT 'environment code',
   `dry_run`                   tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
   PRIMARY KEY (`id`),
-  KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
+  KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE,
+  KEY `workflow_instance_id_index` (`process_instance_id`,`command_type`) USING BTREE,
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
 -- ----------------------------
@@ -859,6 +860,7 @@ CREATE TABLE `t_ds_task_instance` (
   `var_pool` longtext COMMENT 'var_pool',
   `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
   `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
+  `runtime_context` text COMMENT 'used to store some information used in runtime',
   PRIMARY KEY (`id`),
   KEY `process_instance_id` (`process_instance_id`) USING BTREE,
   KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
@@ -1956,9 +1958,26 @@ CREATE TABLE `t_ds_isolation_task`
     `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
     `task_name`              varchar(255) NOT NULL COMMENT 'isolation task name',
     `task_code`              bigint       NOT NULL COMMENT 'isolation task code',
-    `status`                 tinyint(4) DEFAULT '0',
     `create_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP,
     `update_time`            timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
     PRIMARY KEY (`id`),
-    KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE
+    UNIQUE KEY `workflow_instance_task__unique` (`workflow_instance_id`,`task_code`)
+) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_coronation_task`;
+CREATE TABLE `t_ds_coronation_task`
+(
+    `id`                       bigint       NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `workflow_instance_id`     bigint       NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+    `workflow_instance_name`   varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+    `task_name`                varchar(255) NOT NULL COMMENT 'isolation task name',
+    `task_code`                bigint       NOT NULL COMMENT 'isolation task code',
+    `forbidden_upstream_tasks` text COMMENT 'The task in forbidden_upstream_tasks will be in forbidden execute list',
+    `create_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    `update_time`              timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `workflow_instance_task__unique` (`workflow_instance_id`,`task_code`)
 ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
similarity index 52%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
index 826a456fd8..1a88860490 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
@@ -1,26 +1,26 @@
-package org.apache.dolphinscheduler.server.master.processor.isolation;
+package org.apache.dolphinscheduler.server.master.processor.coronation;
 
 import io.netty.channel.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+public class RefreshCoronationMetadataProcessor implements NettyRequestProcessor {
 
     @Autowired
-    private IsolationTaskManager isolationTaskManager;
+    private CoronationMetadataManager coronationMetadataManager;
 
     @Override
     public void process(Channel channel, Command command) {
-        if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
-            throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
+        if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
+            throw new IllegalArgumentException(String.format("The current rpc command : %s is invalidated", command));
         }
-        isolationTaskManager.refreshIsolationTaskMapFromDB();
+        coronationMetadataManager.refreshCoronationTaskMetadata();
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
similarity index 71%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
index 826a456fd8..19dd54211e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
@@ -5,22 +5,22 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+public class RefreshIsolationMetadataProcessor implements NettyRequestProcessor {
 
     @Autowired
-    private IsolationTaskManager isolationTaskManager;
+    private IsolationMetadataManager isolationTaskManager;
 
     @Override
     public void process(Channel channel, Command command) {
-        if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
+        if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
             throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
         }
-        isolationTaskManager.refreshIsolationTaskMapFromDB();
+        isolationTaskManager.refreshIsolationTaskMetadata();
     }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
similarity index 91%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
index ef3634cd52..3522ac4fec 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.api.remote;
+package org.apache.dolphinscheduler.server.master.rpc;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -11,13 +11,13 @@ import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class ApiServerRPCClient {
+public class MasterRPCClient {
 
     private final NettyRemotingClient client;
 
     private static final long DEFAULT_TIME_OUT_MILLS = 1_000L;
 
-    public ApiServerRPCClient() {
+    public MasterRPCClient() {
         client = new NettyRemotingClient(new NettyClientConfig());
         log.info("Success initialized ApiServerRPCClient...");
     }
@@ -32,4 +32,5 @@ public class ApiServerRPCClient {
                                 long timeoutMills) throws RemotingException, InterruptedException {
         client.sendSync(host, rpcCommand, timeoutMills);
     }
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index eb4d82cd3d..4644611a7f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.rpc;
 
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -31,9 +30,8 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningPro
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
 
-import javax.annotation.PostConstruct;
-
-import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor;
+import org.apache.dolphinscheduler.server.master.processor.coronation.RefreshCoronationMetadataProcessor;
+import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationMetadataProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -76,8 +74,10 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
 
+    private RefreshCoronationMetadataProcessor refreshCoronationMetadataProcessor;
+
     @Autowired
-    private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor;
+    private RefreshIsolationMetadataProcessor refreshIsolationTaskProcessor;
 
     public void start() {
         logger.info("Starting Master RPC Server...");
@@ -101,8 +101,10 @@ public class MasterRPCServer implements AutoCloseable {
         this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
-        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST,
+        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_METADATA_REQUEST,
                 refreshIsolationTaskProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_CORONATION_METADATA_REQUEST,
+                refreshCoronationMetadataProcessor);
 
         this.nettyRemotingServer.start();
         logger.info("Started Master RPC Server...");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 6446248c06..a72b39612d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -22,7 +22,8 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,10 @@ public class FailoverExecuteThread extends BaseDaemonThread {
     private MasterFailoverService masterFailoverService;
 
     @Autowired
-    private IsolationTaskManager isolationTaskManager;
+    private IsolationMetadataManager isolationMetadataManager;
+
+    @Autowired
+    protected CoronationMetadataManager coronationMetadataManager;
 
     protected FailoverExecuteThread() {
         super("FailoverExecuteThread");
@@ -70,7 +74,8 @@ public class FailoverExecuteThread extends BaseDaemonThread {
                 // todo: DO we need to schedule a task to do this kind of check
                 // This kind of check may only need to be executed when a master server start
                 masterFailoverService.checkMasterFailover();
-                isolationTaskManager.refreshIsolationTaskMapFromDB();
+                isolationMetadataManager.refreshIsolationTaskMetadata();
+                coronationMetadataManager.refreshCoronationTaskMetadata();
             } catch (Exception e) {
                 logger.error("Master failover thread execute error", e);
             } finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 1d823a71a7..45427fb402 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -46,6 +45,8 @@ import org.apache.dolphinscheduler.server.master.exception.MasterException;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -110,7 +111,10 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
     private StateEventCallbackService stateEventCallbackService;
 
     @Autowired
-    private IsolationTaskDao isolationTaskDao;
+    private CoronationMetadataManager coronationMetadataManager;
+
+    @Autowired
+    private IsolationMetadataManager isolationMetadataManager;
 
     private String masterAddress;
 
@@ -189,7 +193,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                                 masterConfig,
                                 stateWheelExecuteThread,
                                 curingGlobalParamsService,
-                                isolationTaskDao);
+                                isolationMetadataManager,
+                                coronationMetadataManager);
                         processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
                         workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
                                 processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 24d5d31451..b284c80ef2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -37,10 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.dto.TaskRuntimeContext;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.Environment;
-import org.apache.dolphinscheduler.dao.entity.IsolationTask;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -49,7 +49,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -68,6 +67,8 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.server.master.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -191,8 +192,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final Set<String> stateCleanTaskCodes = new HashSet<>();
 
-    // isolationTaskCode -> isolatedTimes
-    private final Map<Long, Integer> isolationTaskCodesToTimesMap = new HashMap<>();
+    // TaskCode -> isolatedTimes
+    // this map store the task which will be isolated, and the times it has been isolated.
+    private final Map<Long, Integer> isolatedTaskCodesToTimesMap = new HashMap<>();
+    // TaskCode -> coronattedTimes
+    private final Map<Long, Integer> coronattedTaskCodeToTimesMap = new HashMap<>();
 
     /**
      * state event queue
@@ -214,7 +218,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final CuringParamsService curingParamsService;
 
-    private final IsolationTaskDao isolationTaskDao;
+    private final IsolationMetadataManager isolationMetadataManager;
+    private final CoronationMetadataManager coronationMetadataManager;
 
     private final String masterAddress;
 
@@ -236,7 +241,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                                    @NonNull MasterConfig masterConfig,
                                    @NonNull StateWheelExecuteThread stateWheelExecuteThread,
                                    @NonNull CuringParamsService curingParamsService,
-                                   @NonNull IsolationTaskDao isolationTaskDao) {
+                                   @NonNull IsolationMetadataManager isolationMetadataManager,
+                                   @NonNull CoronationMetadataManager coronationMetadataManager) {
         this.processService = processService;
         this.processInstanceDao = processInstanceDao;
         this.processInstance = processInstance;
@@ -244,8 +250,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
         this.curingParamsService = curingParamsService;
-        this.isolationTaskDao = isolationTaskDao;
+        this.isolationMetadataManager = isolationMetadataManager;
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        this.coronationMetadataManager = coronationMetadataManager;
         TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
     }
 
@@ -393,6 +400,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 if (!processInstance.isBlocked()) {
                     submitPostNode(Long.toString(taskInstance.getTaskCode()));
                 }
+                if (coronationMetadataManager.isInCoronationMode()
+                        && coronattedTaskCodeToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                    // The current task is coronation task and is success
+                    // we need to delete the coronation metadata in db, and send request to all master
+                    coronationMetadataManager.deleteCoronationMetadataInDB(taskInstance.getTaskCode());
+                    coronattedTaskCodeToTimesMap.remove(taskInstance.getTaskCode());
+                }
             } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
                 // retry task
                 logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
@@ -453,31 +467,35 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
-    public void onlineTaskIsolation(long taskCode) {
+    public void addTaskIsolation(long taskCode) {
         try {
             LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
             // find the post running task
-            Set<String> needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
-            // if the current task is finished, kill the post task
-            // we need to submit an online isolation task event, otherwise there may exist concurrent problem
-            for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) {
-                Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr);
-                isolationTaskCodesToTimesMap.put(isolationTaskCode,
-                        isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1);
-                ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
-                if (iTaskProcessor == null) {
-                    logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
-                            isolationTaskCode);
-                    continue;
-                }
-                iTaskProcessor.action(TaskAction.ISOLATE);
-                StateEvent stateEvent = new StateEvent();
-                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-                stateEvent.setProcessInstanceId(this.processInstance.getId());
-                stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
-                stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
-                this.addStateEvent(stateEvent);
-            }
+            DagHelper.getAllPostNodes(Long.toString(taskCode), dag)
+                    .stream()
+                    .map(Long::parseLong)
+                    .forEach(isolatedTaskCode -> {
+                        // if the current task is finished, kill the post task
+                        // we need to submit an online isolation task event, otherwise there may exist concurrent
+                        // problem
+                        isolatedTaskCodesToTimesMap.put(isolatedTaskCode,
+                                isolatedTaskCodesToTimesMap.getOrDefault(isolatedTaskCode, 0) + 1);
+                        ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolatedTaskCode);
+                        if (iTaskProcessor == null) {
+                            logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
+                                    isolatedTaskCode);
+                            return;
+                        }
+                        iTaskProcessor.action(TaskAction.ISOLATE);
+                        StateEvent stateEvent = new StateEvent();
+                        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+                        stateEvent.setProcessInstanceId(this.processInstance.getId());
+                        stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
+                        stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
+                        this.addStateEvent(stateEvent);
+
+                    });
+
         } finally {
             LoggerUtils.removeWorkflowInstanceIdMDC();
         }
@@ -486,12 +504,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     public void cancelTaskIsolation(long taskCode) {
         try {
             LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-            // todo:
             // restart the killed/ paused task
-            Set<String> needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
-            for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) {
-                Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr);
-                Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode);
+            Set<String> needToCancelIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+            for (String needToCancelIsolationTaskCodeStr : needToCancelIsolationTaskCodes) {
+                Long isolationTaskCode = Long.valueOf(needToCancelIsolationTaskCodeStr);
+                Integer isolateTimes = isolatedTaskCodesToTimesMap.get(isolationTaskCode);
                 if (isolateTimes == null) {
                     logger.warn(
                             "The current task has not been isolated, so it don't need to offline isolation, taskCode: {}",
@@ -499,7 +516,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                     continue;
                 }
                 if (isolateTimes == 1) {
-                    isolationTaskCodesToTimesMap.remove(isolationTaskCode);
+                    isolatedTaskCodesToTimesMap.remove(isolationTaskCode);
                     ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
                     if (iTaskProcessor == null) {
                         // the current task has not been submitted
@@ -521,7 +538,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                         continue;
                     }
                 } else {
-                    isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
+                    isolatedTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
                 }
 
             }
@@ -899,15 +916,28 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
         // generate process dag
         dag = DagHelper.buildDagGraph(processDag);
-        List<IsolationTask> isolationTasks =
-                isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE);
-        for (IsolationTask isolationTask : isolationTasks) {
-            Set<String> allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag);
-            allPostNodes.forEach(postNode -> {
-                isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(),
-                        isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1);
-            });
-        }
+        isolationMetadataManager.queryIsolationTasksByWorkflowInstanceId(processInstance.getId())
+                .forEach(isolationTask -> {
+                    DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag)
+                            .stream()
+                            .map(Long::parseLong)
+                            .forEach(taskCode -> {
+                                isolatedTaskCodesToTimesMap.put(
+                                        taskCode,
+                                        isolatedTaskCodesToTimesMap.getOrDefault(taskCode, 0) + 1);
+                            });
+                });
+        coronationMetadataManager.getCoronationTasksByWorkflowInstanceId(processInstance.getId())
+                .forEach(coronationTask -> {
+                    DagHelper.getAllPreNodes(Long.toString(coronationTask.getTaskCode()), dag)
+                            .stream()
+                            .map(Long::parseLong)
+                            .forEach(taskCode -> {
+                                coronattedTaskCodeToTimesMap.put(
+                                        taskCode,
+                                        coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
+                            });
+                });
         logger.info("Build dag success, dag: {}", dag);
     }
 
@@ -1043,11 +1073,28 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
         try {
-            if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+            // todo: use TaskInstanceDTO
+            TaskRuntimeContext taskRuntimeContext = new TaskRuntimeContext();
+            if (isolatedTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                taskRuntimeContext.setIsolationTask(
+                        isolationMetadataManager.isIsolatedTask(processInstance.getId(), taskInstance.getTaskCode()));
+                taskRuntimeContext.setHasBeenIsolated(true);
                 taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION);
                 logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}",
                         taskInstance.getTaskCode());
             }
+            if (coronationMetadataManager.isInCoronationMode()) {
+                if (!coronattedTaskCodeToTimesMap.containsKey(taskInstance.getTaskCode())) {
+                    taskInstance.setState(ExecutionStatus.PAUSE_BY_CORONATION);
+                    logger.info(
+                            "The current server mode is in coronation, and the task: {} is not in coronation list, will pause it",
+                            taskInstance.getTaskCode());
+                }
+                taskRuntimeContext.setCoronationTask(coronationMetadataManager.isCoronationTask(processInstance.getId(),
+                        taskInstance.getTaskCode()));
+                taskRuntimeContext.setHasBeenCoronatted(true);
+            }
+            taskInstance.setRuntimeContext(JSONUtils.toJsonString(taskRuntimeContext));
 
             // package task instance before submit
             processService.packageTaskInstance(taskInstance, processInstance);
@@ -1784,6 +1831,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             if (task == null) {
                 continue;
             }
+            if (coronationMetadataManager.isInCoronationMode()
+                    && skipTaskNodeMap.containsKey(Long.toString(task.getTaskCode()))) {
+                logger.info(
+                        "The current server mode is in coronation, and the task: {} is in skip list, will ignore this task",
+                        task.getTaskCode());
+                continue;
+            }
+
             // stop tasks which is retrying if forced success happens
             if (task.taskCanRetry()) {
                 TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
@@ -1921,6 +1976,65 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
     }
 
+    /**
+     * Add the task in coronation list, if the task is pause by coronation, will recovery it.
+     * if the task is in skip list, will ignore the coronation.
+     */
+    public void onlineTaskCoronation(CoronationTaskDTO coronationTask) {
+        // set the forbidden task nodes
+        long taskCode = coronationTask.getTaskCode();
+        coronationTask.getForbiddenUpstreamTasks().forEach(forbiddenTaskNode -> {
+            String forbiddenTaskNodeStr = Long.toString(forbiddenTaskNode.getTaskCode());
+            skipTaskNodeMap.put(forbiddenTaskNodeStr, dag.getNode(forbiddenTaskNodeStr));
+        });
+        // find the upstream task
+        Set<String> parentNodes = DagHelper.getAllPreNodes(Long.toString(taskCode), dag);
+        for (String parentNodeCodeStr : parentNodes) {
+            long parentNodeCode = Long.parseLong(parentNodeCodeStr);
+            if (skipTaskNodeMap.containsKey(parentNodeCodeStr)) {
+                logger.info("The parent node: {} is in skip list, no need to coronation", parentNodeCodeStr);
+                continue;
+            }
+            coronattedTaskCodeToTimesMap.put(parentNodeCode,
+                    coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
+            Integer parentNodeInstanceId = validTaskMap.get(parentNodeCode);
+            if (parentNodeInstanceId != null) {
+                TaskInstance taskInstance = activeTaskProcessorMaps.get(parentNodeInstanceId).taskInstance();
+                if (taskInstance.getState().typeIsPauseByCoronation()) {
+                    // resubmit the task to standbylist, this task will be resubmit again.
+                    taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                    logger.info(
+                            "The current task: {} has been added to coronation, will recovery from pause_by_coronation, now set the status to submitted_success, it will submit again",
+                            parentNodeCode);
+                    addTaskToStandByList(taskInstance);
+                    continue;
+                }
+            } else {
+                logger.info("The current task: {} has been added to coronation", parentNodeCode);
+            }
+        }
+    }
+
+    /**
+     * Cancel the coronation task, will only remove the task in coronation task.
+     */
+    public void cancelTaskCoronation(CoronationTaskDTO coronationTask) {
+        Set<String> parentNodes = DagHelper.getAllPreNodes(Long.toString(coronationTask.getTaskCode()), dag);
+        for (String parentNodeStr : parentNodes) {
+            long parentNode = Long.parseLong(parentNodeStr);
+            Integer coronationTimes = coronattedTaskCodeToTimesMap.get(parentNode);
+            if (coronationTimes == null) {
+                continue;
+            }
+            if (coronationTimes == 1) {
+                logger.info("The current task: {} has been removed from coronation list", parentNode);
+                coronattedTaskCodeToTimesMap.remove(parentNode);
+                continue;
+            }
+            coronattedTaskCodeToTimesMap.put(parentNode, coronationTimes - 1);
+        }
+    }
+
     private enum WorkflowRunnableStatus {
         CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
         ;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
new file mode 100644
index 0000000000..68aba94bda
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
@@ -0,0 +1,273 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRPCClient;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.transformer.MasterCommandTransformer;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class CoronationMetadataManager {
+
+    @Autowired
+    private CoronationTaskDao coronationTaskDao;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Autowired
+    private MasterCommandTransformer commandTransformer;
+
+    @Autowired
+    private CommandDao commandDao;
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private MasterRPCClient masterRPCClient;
+
+    private volatile CoronationMode coronationMode = CoronationMode.NOT_IN_CORONATION;
+
+    // coronationId -> coronationTask
+    private volatile Map<Long, CoronationTaskDTO> coronationTaskInMemory = new HashMap<>();
+    private final Map<Integer, Set<CoronationTaskDTO>> workflowCoronationTaskMap = new HashMap<>();
+    // workflowInstanceId -> coronationTaskCodes
+    private final Map<Integer, Set<Long>> workflowCoronationTaskCodeMap = new HashMap<>();
+
+    @PostConstruct
+    public void init() {
+        refreshCoronationTaskMetadata();
+    }
+
+    public synchronized void refreshCoronationTaskMetadata() {
+        StopWatch stopWatch = StopWatch.createStarted();
+        Map<Long, CoronationTaskDTO> coronationTaskInDB = coronationTaskDao.queryAllCoronationTasks()
+                .stream()
+                .collect(Collectors.toMap(CoronationTaskDTO::getId, Function.identity()));
+
+        List<CoronationTaskDTO> addCoronationTasks = coronationTaskInDB.values()
+                .stream()
+                .filter(coronationTask -> !coronationTaskInMemory.containsKey(coronationTask.getId()))
+                .collect(Collectors.toList());
+        List<CoronationTaskDTO> deleteCoronationTasks = coronationTaskInMemory.values()
+                .stream()
+                .filter(coronationTask -> !coronationTaskInDB.containsKey(coronationTask.getId()))
+                .collect(Collectors.toList());
+        coronationTaskInMemory = coronationTaskInDB;
+        if (coronationTaskInMemory.isEmpty()) {
+            if (coronationMode == CoronationMode.IN_CORONATION) {
+                log.info("There is not coronation tasks, will begin to close coronation mode...");
+                closeCoronation();
+                coronationMode = CoronationMode.NOT_IN_CORONATION;
+                log.info("Close coronation mode success...");
+            }
+        } else {
+            addCoronationTasks(addCoronationTasks);
+            cancelCoronationTasks(deleteCoronationTasks);
+            if (coronationMode == CoronationMode.NOT_IN_CORONATION) {
+                coronationMode = CoronationMode.IN_CORONATION;
+                log.info("Open the coronation mode...");
+            }
+        }
+        stopWatch.stop();
+        log.info("Refresh coronation task from DB finished, cost: {}", stopWatch.getTime());
+    }
+
+    public boolean isCoronationTask(int workflowInstanceId, long taskCode) {
+        Set<Long> taskCodes = workflowCoronationTaskCodeMap.get(workflowInstanceId);
+        if (taskCodes == null) {
+            return false;
+        }
+        return taskCodes.contains(taskCode);
+    }
+
+    public void deleteCoronationMetadataInDB(long taskCode) {
+        Optional<CoronationTaskDTO> successCoronationTask = coronationTaskInMemory.values()
+                .stream()
+                .filter(coronationTask -> coronationTask.getTaskCode() == taskCode)
+                .findAny();
+        if (!successCoronationTask.isPresent()) {
+            return;
+        }
+        coronationTaskDao.deleteById(successCoronationTask.get().getId());
+        if (coronationTaskDao.queryAllCoronationTaskNumber() == 0) {
+            // sendCoronationRefreshRequestTo All master
+            try {
+                log.info("There is no coronation tasks, will send refresh coronation meta data to all master");
+                List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+                RefreshCoronationMetadataRequest request = new RefreshCoronationMetadataRequest();
+                for (Server master : masters) {
+                    try {
+                        masterRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+                                request.convert2Command());
+                    } catch (Exception e) {
+                        log.error(
+                                "Close coronation failed, send refresh coronation metadata request to master: {} error",
+                                master, e);
+                    }
+                }
+            } catch (Exception ex) {
+                log.error(
+                        "Close coronation failed, meet an unknown exception, will wait the master to auto refresh the coronation metadata",
+                        ex);
+            }
+        }
+    }
+
+    private void closeCoronation() {
+        // The current server is in coronation mode, need to close coronation.
+        // Need to acquire a lock to guarantee there is only one master recovery the pause_by_coronation workflow
+        try {
+            // block to acquire the master lock
+            if (!registryClient.getLock(NodeType.MASTER.getRegistryPath())) {
+                log.error("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
+                return;
+            }
+            // find the all instance that need to be recovery
+            // create recovery command
+            List<Command> needToInsertCommand =
+                    processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_ISOLATION)
+                            .stream()
+                            .filter(processInstance -> {
+                                List<Command> commands = commandDao
+                                        .queryRecoveryCoronationCommandByWorkflowInstanceId(processInstance.getId());
+                                // this workflow instance has not been recovery.
+                                return CollectionUtils.isEmpty(commands);
+                            })
+                            .map(processInstance -> {
+                                List<Integer> needToRecoveryTaskInstanceIds = taskInstanceDao
+                                        .queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(
+                                                processInstance.getId(), ExecutionStatus.PAUSE_BY_CORONATION)
+                                        .stream()
+                                        .map(TaskInstance::getId)
+                                        .collect(Collectors.toList());
+                                return commandTransformer.transformToRecoveryFromCoronationPauseCommand(processInstance,
+                                        needToRecoveryTaskInstanceIds);
+                            })
+                            .collect(Collectors.toList());
+            commandDao.batchInsertCommand(needToInsertCommand);
+        } finally {
+            registryClient.releaseLock(NodeType.MASTER.getRegistryPath());
+        }
+    }
+
+    private void addCoronationTasks(List<CoronationTaskDTO> coronationTasksNeedToOnline) {
+        if (CollectionUtils.isEmpty(coronationTasksNeedToOnline)) {
+            return;
+        }
+        for (CoronationTaskDTO coronationTask : coronationTasksNeedToOnline) {
+            workflowCoronationTaskCodeMap.computeIfAbsent(coronationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(coronationTask.getTaskCode());
+            workflowCoronationTaskMap.computeIfAbsent(coronationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(coronationTask);
+            WorkflowExecuteRunnable workflowExecuteRunnable =
+                    processInstanceExecCacheManager.getByProcessInstanceId(coronationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(coronationTask.getWorkflowInstanceId());
+                log.info("Begin to add new coronation task: {}", coronationTask);
+                workflowExecuteRunnable.onlineTaskCoronation(coronationTask);
+            } catch (Exception ex) {
+                log.error("Add new coronation task: {} failed, meet an unknown exception", coronationTask, ex);
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+    private void cancelCoronationTasks(List<CoronationTaskDTO> coronationTasksNeedToOffline) {
+        if (CollectionUtils.isEmpty(coronationTasksNeedToOffline)) {
+            return;
+        }
+        for (CoronationTaskDTO coronationTask : coronationTasksNeedToOffline) {
+            Set<Long> taskCodes = workflowCoronationTaskCodeMap.get(coronationTask.getWorkflowInstanceId());
+            Set<CoronationTaskDTO> coronationTasks =
+                    workflowCoronationTaskMap.get(coronationTask.getWorkflowInstanceId());
+            if (taskCodes != null) {
+                taskCodes.remove(coronationTask.getTaskCode());
+                coronationTasks.remove(coronationTask);
+                if (taskCodes.isEmpty()) {
+                    workflowCoronationTaskCodeMap.remove(coronationTask.getWorkflowInstanceId());
+                    workflowCoronationTaskMap.remove(coronationTask.getWorkflowInstanceId());
+                }
+            }
+            WorkflowExecuteRunnable workflowExecuteRunnable =
+                    processInstanceExecCacheManager.getByProcessInstanceId(coronationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(coronationTask.getWorkflowInstanceId());
+                log.info("Begin to cancel coronation task: {}", coronationTask);
+                workflowExecuteRunnable.cancelTaskCoronation(coronationTask);
+            } catch (Exception ex) {
+                log.error("Cancel coronation task: {} failed, meed an unknown exception", coronationTask, ex);
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+    public boolean isInCoronationMode() {
+        return coronationMode == CoronationMode.IN_CORONATION;
+    }
+
+    public Set<CoronationTaskDTO> getCoronationTasksByWorkflowInstanceId(int workflowInstanceId) {
+        return workflowCoronationTaskMap.getOrDefault(workflowInstanceId, Collections.emptySet());
+    }
+
+    public enum CoronationMode {
+
+        IN_CORONATION(0, "The current server is already in coronation mode"),
+        NOT_IN_CORONATION(1, "The current server is not in coronation mode"),
+        ;
+
+        private final int code;
+        private final String desc;
+
+        CoronationMode(int code, String desc) {
+            this.code = code;
+            this.desc = desc;
+        }
+
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
new file mode 100644
index 0000000000..d90a95a42d
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
@@ -0,0 +1,136 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class IsolationMetadataManager {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    private volatile Map<Long, IsolationTask> isolationTasksInMemory = new HashMap<>();
+    private final Map<Integer, Set<Long>> workflowIsolatedTaskCodeMap = new HashMap<>();
+    private final Map<Integer, Set<IsolationTask>> workflowIsolatedTaskMap = new HashMap<>();
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @PostConstruct
+    public void init() {
+        refreshIsolationTaskMetadata();
+    }
+
+    public synchronized void refreshIsolationTaskMetadata() {
+        StopWatch stopWatch = StopWatch.createStarted();
+
+        Map<Long, IsolationTask> isolationTasksInDB =
+                isolationTaskDao.queryAllIsolationTask()
+                        .stream()
+                        .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
+
+        List<IsolationTask> needToAddIsolationTasks = isolationTasksInMemory.values()
+                .stream()
+                .filter(isolationTask -> !isolationTasksInDB.containsKey(isolationTask.getId()))
+                .collect(Collectors.toList());
+
+        List<IsolationTask> needToCancelIsolationTasks = isolationTasksInDB.values()
+                .stream()
+                .filter(isolationTask -> !isolationTasksInMemory.containsKey(isolationTask.getId()))
+                .collect(Collectors.toList());
+
+        isolationTasksInMemory = isolationTasksInDB;
+
+        cancelIsolationTask(needToAddIsolationTasks);
+        addIsolationTask(needToCancelIsolationTasks);
+
+        stopWatch.stop();
+        log.info("Refresh isolation task from db finished, cost: {} ms", stopWatch.getTime());
+    }
+
+    public boolean isIsolatedTask(int workflowInstanceId, long taskCode) {
+        Set<Long> taskCodes = workflowIsolatedTaskCodeMap.get(workflowInstanceId);
+        if (taskCodes == null) {
+            return false;
+        }
+        return taskCodes.contains(taskCode);
+    }
+
+    private void cancelIsolationTask(List<IsolationTask> needCancelIsolationTasks) {
+        if (CollectionUtils.isEmpty(needCancelIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needCancelIsolation : needCancelIsolationTasks) {
+            Set<Long> taskCodes = workflowIsolatedTaskCodeMap.get(needCancelIsolation.getWorkflowInstanceId());
+            Set<IsolationTask> isolationTasks =
+                    workflowIsolatedTaskMap.get(needCancelIsolation.getWorkflowInstanceId());
+            if (taskCodes != null) {
+                taskCodes.remove(needCancelIsolation.getTaskCode());
+                isolationTasks.remove(needCancelIsolation);
+                if (taskCodes.isEmpty()) {
+                    workflowIsolatedTaskCodeMap.remove(needCancelIsolation.getWorkflowInstanceId());
+                    workflowIsolatedTaskMap.remove(needCancelIsolation.getWorkflowInstanceId());
+                }
+            }
+
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needCancelIsolation.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            workflowExecuteRunnable.cancelTaskIsolation(needCancelIsolation.getTaskCode());
+            log.info("Backend offline isolation task, isolationTaskId: {}", needCancelIsolation.getId());
+        }
+    }
+
+    private void addIsolationTask(List<IsolationTask> needAddIsolationTasks) {
+        if (CollectionUtils.isEmpty(needAddIsolationTasks)) {
+            return;
+        }
+        for (IsolationTask needAddIsolationTask : needAddIsolationTasks) {
+            workflowIsolatedTaskCodeMap
+                    .computeIfAbsent(needAddIsolationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(needAddIsolationTask.getTaskCode());
+            workflowIsolatedTaskMap
+                    .computeIfAbsent(needAddIsolationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+                    .add(needAddIsolationTask);
+            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+                    .getByProcessInstanceId(needAddIsolationTask.getWorkflowInstanceId());
+            if (workflowExecuteRunnable == null) {
+                continue;
+            }
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(needAddIsolationTask.getWorkflowInstanceId());
+                log.info("Begin to add new isolation task, taskCode: {}", needAddIsolationTask.getTaskCode());
+                workflowExecuteRunnable.addTaskIsolation(needAddIsolationTask.getTaskCode());
+            } catch (Exception ex) {
+                log.error("Add new isolation task: {} failed, meet an known exception",
+                        needAddIsolationTask.getTaskCode());
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+    public Set<IsolationTask> queryIsolationTasksByWorkflowInstanceId(int workflowInstanceId) {
+        return workflowIsolatedTaskMap.getOrDefault(workflowInstanceId, Collections.emptySet());
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
deleted file mode 100644
index 2621766d84..0000000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.dolphinscheduler.server.master.service;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
-import org.apache.dolphinscheduler.dao.entity.IsolationTask;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Service
-public class IsolationTaskManager {
-
-    @Autowired
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
-    private volatile Map<Long, IsolationTask> onlineIsolationTasksInMemory = new HashMap<>();
-
-    private volatile Set<Long> currentIsolationTaskIdsInMemory = new HashSet<>();
-
-    @Autowired
-    private IsolationTaskDao isolationTaskDao;
-
-    @PostConstruct
-    public void init() {
-        refreshIsolationTaskMapFromDB();
-    }
-
-    public void refreshIsolationTaskMapFromDB() {
-        Map<Long, IsolationTask> totalOnlineIsolationTasksInDB =
-                isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE)
-                        .stream()
-                        .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
-        Set<Long> totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet();
-
-        Collection<IsolationTask> needToOfflineIsolationTasks =
-                CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB)
-                        .stream()
-                        .map(onlineIsolationTasksInMemory::get)
-                        .collect(Collectors.toList());
-
-        Collection<IsolationTask> needToOnlineIsolationTasks =
-                CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory)
-                        .stream()
-                        .map(totalOnlineIsolationTasksInDB::get)
-                        .collect(Collectors.toList());
-
-        currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB;
-        onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB;
-
-        offlineIsolationTask(needToOfflineIsolationTasks);
-        onlineIsolationTask(needToOnlineIsolationTasks);
-    }
-
-    private void offlineIsolationTask(Collection<IsolationTask> needOfflineIsolationTasks) {
-        if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) {
-            return;
-        }
-        for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) {
-            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
-                    .getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId());
-            if (workflowExecuteRunnable == null) {
-                continue;
-            }
-            workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode());
-            log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId());
-        }
-    }
-
-    private void onlineIsolationTask(Collection<IsolationTask> needOnlineIsolationTasks) {
-        if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) {
-            return;
-        }
-        for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) {
-            WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
-                    .getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId());
-            if (workflowExecuteRunnable == null) {
-                continue;
-            }
-            workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode());
-            log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId());
-        }
-    }
-
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java
new file mode 100644
index 0000000000..86f19925c8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java
@@ -0,0 +1,14 @@
+package org.apache.dolphinscheduler.server.master.transformer;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+
+import java.util.List;
+
+public interface MasterCommandTransformer {
+
+    Command transformToRecoveryFromCoronationPauseCommand(@NonNull ProcessInstance processInstance,
+                                                          @NonNull List<Integer> needToRecoveryTaskInstanceIds);
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java
new file mode 100644
index 0000000000..d8b900093b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java
@@ -0,0 +1,45 @@
+package org.apache.dolphinscheduler.server.master.transformer;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class MasterCommandTransformerImpl implements MasterCommandTransformer {
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Override
+    public Command transformToRecoveryFromCoronationPauseCommand(@NonNull ProcessInstance processInstance,
+                                                                 @NonNull List<Integer> needToRecoveryTaskInstanceIds) {
+        if (CollectionUtils.isEmpty(needToRecoveryTaskInstanceIds)) {
+            throw new IllegalArgumentException("The recovery task instance cannot be empty");
+        }
+
+        Command command = new Command();
+        command.setProcessInstanceId(processInstance.getId());
+        command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+        command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+        command.setTaskDependType(processInstance.getTaskDependType());
+        command.setFailureStrategy(processInstance.getFailureStrategy());
+        command.setCommandType(CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS);
+
+        Map<String, String> map = new HashMap<>();
+        map.put(Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS,
+                JSONUtils.toJsonString(needToRecoveryTaskInstanceIds));
+        command.setCommandParam(JSONUtils.toJsonString(map));
+        return command;
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
index 00ae91019d..4bdacfc3a4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
 import static org.apache.dolphinscheduler.common.Constants.COMMA;
@@ -38,6 +39,8 @@ public class WorkflowInstanceUtils {
                 return getStartTaskInstanceIdsFromStateCleanParam(processInstance);
             case RECOVERY_FROM_ISOLATION_TASKS:
                 return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance);
+            case RECOVERY_FROM_CORONATION_PAUSE_TASKS:
+                return getStartTaskInstanceIdsFromRecoverCoronationParam(processInstance);
             default:
                 return Collections.emptyList();
         }
@@ -87,6 +90,21 @@ public class WorkflowInstanceUtils {
         return result;
     }
 
+    public static List<Integer> getStartTaskInstanceIdsFromRecoverCoronationParam(@NonNull ProcessInstance processInstance) {
+        Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+        if (MapUtils.isEmpty(commandParamMap)) {
+            return Collections.emptyList();
+        }
+        List<Integer> stateCleanTaskInstanceIds =
+                JSONUtils.parseObject(commandParamMap.get(CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS),
+                        new TypeReference<ArrayList<Integer>>() {
+                        });
+        if (stateCleanTaskInstanceIds == null) {
+            return Collections.emptyList();
+        }
+        return stateCleanTaskInstanceIds;
+    }
+
     public static List<String> getStartNodeName(@NonNull ProcessInstance processInstance) {
         List<String> startNodeNameList = new ArrayList<>();
         Map<String, String> paramMap = JSONUtils.toMap(processInstance.getCommandParam());
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 856d38cb2f..4e9c9ce9e0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -17,19 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-
-import static org.powermock.api.mockito.PowerMockito.mock;
-
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
-import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -37,12 +27,25 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.context.ApplicationContext;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -58,15 +61,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.springframework.context.ApplicationContext;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.powermock.api.mockito.PowerMockito.mock;
 
 /**
  * test for WorkflowExecuteThread
@@ -93,7 +92,9 @@ public class WorkflowExecuteRunnableTest {
 
     private CuringParamsService curingGlobalParamsService;
 
-    private IsolationTaskDao isolationTaskDao;
+    private IsolationMetadataManager isolationMetadataManager;
+
+    private CoronationMetadataManager coronationMetadataManager;
 
     @Before
     public void init() throws Exception {
@@ -127,11 +128,12 @@ public class WorkflowExecuteRunnableTest {
         curingGlobalParamsService = mock(CuringParamsService.class);
         NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
         ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
-        isolationTaskDao = mock(IsolationTaskDao.class);
+        isolationMetadataManager = mock(IsolationMetadataManager.class);
+        coronationMetadataManager = mock(CoronationMetadataManager.class);
         workflowExecuteThread =
                 PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao,
                         nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread,
-                        curingGlobalParamsService, isolationTaskDao));
+                        curingGlobalParamsService, isolationMetadataManager, coronationMetadataManager));
         // prepareProcess init dag
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index daf92071b5..a69da08951 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -121,7 +121,9 @@ public enum CommandType {
      */
     WORKFLOW_EXECUTING_DATA_RESPONSE,
 
-    REFRESH_ISOLATION_REQUEST,
+    REFRESH_ISOLATION_METADATA_REQUEST,
+
+    REFRESH_CORONATION_METADATA_REQUEST,
     ;
 
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
similarity index 66%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
index 9b097843c4..4f6d94f2a9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.remote.command.isolation;
+package org.apache.dolphinscheduler.remote.command.coronation;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -6,13 +6,14 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 
 import java.io.Serializable;
 
-public class RefreshIsolationTaskRequest implements Serializable {
+public class RefreshCoronationMetadataRequest implements Serializable {
 
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+        command.setType(CommandType.REFRESH_CORONATION_METADATA_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
+
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
similarity index 68%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
index 9b097843c4..0241823a08 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
@@ -1,16 +1,20 @@
 package org.apache.dolphinscheduler.remote.command.isolation;
 
+import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 
 import java.io.Serializable;
 
-public class RefreshIsolationTaskRequest implements Serializable {
+@Data
+@NoArgsConstructor
+public class RefreshIsolationMetadataRequest implements Serializable {
 
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+        command.setType(CommandType.REFRESH_ISOLATION_METADATA_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c1fe78e78a..43000df020 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1048,10 +1048,9 @@ public class ProcessServiceImpl implements ProcessService {
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 break;
             case RECOVERY_FROM_ISOLATION_TASKS:
-                Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
                 List<Integer> recoveryPausedIsolationIds =
                         JSONUtils.parseObject(
-                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+                                cmdParam.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
                                 new TypeReference<ArrayList<Integer>>() {
                                 });
                 if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) {
@@ -1059,7 +1058,7 @@ public class ProcessServiceImpl implements ProcessService {
                 }
                 List<Integer> recoveryKilledIsolationIds =
                         JSONUtils.parseObject(
-                                commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+                                cmdParam.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
                                 new TypeReference<ArrayList<Integer>>() {
                                 });
                 if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) {
@@ -1068,6 +1067,18 @@ public class ProcessServiceImpl implements ProcessService {
                 processInstance.setRunTimes(runTime + 1);
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 break;
+            case RECOVERY_FROM_CORONATION_PAUSE_TASKS:
+                List<Integer> pauseByCoronationIds =
+                        JSONUtils.parseObject(cmdParam.get(Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS),
+                                new TypeReference<ArrayList<Integer>>() {
+                                });
+                for (Integer taskId : pauseByCoronationIds) {
+                    // initialize the pause state
+                    initTaskInstance(this.findTaskInstanceById(taskId));
+                }
+                processInstance.setRunTimes(runTime + 1);
+                processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                break;
             default:
                 break;
         }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index a83184e16b..6f78e37629 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -67,6 +67,8 @@ public enum ExecutionStatus {
     DISPATCH(17, "dispatch"),
     PAUSE_BY_ISOLATION(18, "paused by isolation"),
     KILL_BY_ISOLATION(19, "killed by isolation"),
+
+    PAUSE_BY_CORONATION(20, "paused by coronation"),
     ;
 
     ExecutionStatus(int code, String descp) {
@@ -120,7 +122,7 @@ public enum ExecutionStatus {
      */
     public boolean typeIsFinished() {
         return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation()
-                || typeIsStop() || typeIsBlock();
+                || typeIsStop() || typeIsBlock() || typeIsPauseByCoronation();
     }
 
     public boolean typeIsReady() {
@@ -149,6 +151,10 @@ public enum ExecutionStatus {
         return this == PAUSE_BY_ISOLATION;
     }
 
+    public boolean typeIsPauseByCoronation() {
+        return this == PAUSE_BY_CORONATION;
+    }
+
     public boolean typeIsKilledByIsolation() {
         return this == KILL_BY_ISOLATION;
     }


[dolphinscheduler] 03/05: Fix workerGroup will be updated by WorkerDataListener (#183)

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 9ef8d4bb051728b1843a0b7329593c06c6f1688b
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Oct 28 17:32:47 2022 +0800

    Fix workerGroup will be updated by WorkerDataListener (#183)
---
 .../server/master/registry/ServerNodeManager.java             | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 6a758a934a..b8ed0abd6a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -224,20 +224,15 @@ public class ServerNodeManager implements InitializingBean {
                     final String workerAddress = parts[parts.length - 1];
 
                     if (type == Type.ADD) {
+                        // we don't update the workerGroupToAddressMap, this node will be addded to map aftre 10s
                         logger.info("worker group node : {} added.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        logger.info("currentNodes : {}", currentNodes);
-                        syncWorkerGroupNodes(workerGroupName, new HashSet<>(currentNodes));
                     } else if (type == Type.REMOVE) {
                         logger.info("worker group node : {} down.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, new HashSet<>(currentNodes));
+                        // Remove the node from workerNodeInfo, it will not receive task
+                        workerNodeInfo.remove(workerAddress);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                     } else if (type == Type.UPDATE) {
                         logger.debug("worker group node : {} update, data: {}", path, data);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, new HashSet<>(currentNodes));
-
                         syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
                     }
                 } catch (Exception ex) {


[dolphinscheduler] 04/05: Change requestBody

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 05de0d280c2f21bd089c3ad6ac0c2669ca5a6b36
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun Oct 30 18:46:20 2022 +0800

    Change requestBody
---
 .../api/controller/CoronationTaskController.java          | 13 ++++++++++++-
 .../api/dto/request/CoronationTaskListingRequest.java     |  2 ++
 .../api/dto/request/CoronationTaskSubmitRequest.java      |  4 +++-
 .../api/service/impl/CoronationTaskServiceImpl.java       |  5 +++--
 .../api/service/impl/ProcessInstanceServiceImpl.java      | 12 ++++++++++++
 .../dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java       |  4 +++-
 .../apache/dolphinscheduler/dao/mapper/CommandMapper.java |  7 ++++---
 .../dolphinscheduler/dao/mapper/CoronationTaskMapper.java |  1 +
 .../dolphinscheduler/dao/mapper/IsolationTaskMapper.java  |  1 +
 .../dao/repository/CoronationTaskDao.java                 |  1 +
 .../dolphinscheduler/dao/repository/IsolationTaskDao.java |  2 ++
 .../dao/repository/impl/CommandDaoImpl.java               |  2 +-
 .../dao/repository/impl/CoronationTaskDaoImpl.java        |  5 +++++
 .../dao/repository/impl/IsolationTaskDaoImpl.java         |  5 +++++
 .../apache/dolphinscheduler/dao/mapper/CommandMapper.xml  |  9 +++++++++
 .../dolphinscheduler/dao/mapper/CoronationTaskMapper.xml  |  6 ++++++
 .../dolphinscheduler/dao/mapper/IsolationTaskMapper.xml   |  6 ++++++
 .../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml |  6 ++++++
 .../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml    | 15 ++++++++++++---
 .../coronation/RefreshCoronationMetadataProcessor.java    |  2 +-
 .../server/master/rpc/MasterRPCClient.java                |  4 ++++
 .../server/master/rpc/MasterRPCServer.java                |  1 +
 .../server/master/runner/WorkflowExecuteRunnable.java     | 15 ++++++++++++++-
 .../server/master/service/CoronationMetadataManager.java  | 14 +++++++-------
 .../service/process/ProcessServiceImpl.java               |  3 ++-
 25 files changed, 123 insertions(+), 22 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
index 80ee7a1107..7540c118b9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
@@ -9,6 +9,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.CoronationTaskService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
 import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
@@ -24,6 +25,7 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestAttribute;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 import springfox.documentation.annotations.ApiIgnore;
@@ -83,7 +85,16 @@ public class CoronationTaskController {
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result<PageInfo<CoronationTaskDTO>> listingCoronationTasks(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                                                       @PathVariable("projectCode") long projectCode,
-                                                                      @RequestBody CoronationTaskListingRequest request) {
+                                                                      @RequestParam(required = false) String workflowInstanceName,
+                                                                      @RequestParam(required = false) String taskName,
+                                                                      @RequestParam Integer pageNo,
+                                                                      @RequestParam Integer pageSize) {
+        CoronationTaskListingRequest request = CoronationTaskListingRequest.builder()
+                .workflowInstanceName(workflowInstanceName)
+                .taskName(taskName)
+                .pageNo(pageNo)
+                .pageSize(pageSize)
+                .build();
         return Result.success(coronationTaskService.listingCoronationTasks(loginUser, projectCode, request));
 
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
index 7ecded2c1c..5e8a384e17 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
@@ -1,12 +1,14 @@
 package org.apache.dolphinscheduler.api.dto.request;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import javax.validation.constraints.NotNull;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class CoronationTaskListingRequest {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
index 2597a1206e..77d9008ee5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
@@ -1,6 +1,7 @@
 package org.apache.dolphinscheduler.api.dto.request;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
@@ -8,9 +9,10 @@ import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
 import java.util.List;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class CoronationTaskSubmitRequest {
 
-    private List<CoronationTaskParseVO> CoronationTasks;
+    private List<CoronationTaskParseVO> coronationTasks;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
index a6c0691c8b..05269693cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
@@ -89,7 +90,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService {
                         throw new ServiceException(Status.CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED);
                     }
                     List<TaskSimpleInfoDTO> previousTaskNodeDTO =
-                            workflowDAG.getPreviousNodes(Long.toString(vo.getTaskCode()))
+                            DagHelper.getAllPreNodes(Long.toString(vo.getTaskCode()), workflowDAG)
                                     .stream()
                                     .map(previousNodeCode -> {
                                         TaskNode node = workflowDAG.getNode(previousNodeCode);
@@ -126,7 +127,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService {
 
             List<CoronationTask> coronationTasks = vos.stream()
                     .map(vo -> {
-                        Set<String> previousNodes = workflowDAG.getPreviousNodes(vo.getTaskCode().toString());
+                        Set<String> previousNodes = DagHelper.getAllPreNodes(vo.getTaskCode().toString(), workflowDAG);
                         Set<String> selectNodes = vo.getUpstreamTasks()
                                 .stream()
                                 .map(taskNode -> Long.toString(taskNode.getTaskCode()))
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index c5a84a2514..5aa0827221 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -61,6 +61,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -164,6 +166,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
     @Autowired
     private CuringParamsService curingGlobalParamsService;
 
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @Autowired
+    private CoronationTaskDao coronationTaskDao;
+
     /**
      * return top n SUCCESS process instance order by running time which started between startTime and endTime
      */
@@ -656,6 +664,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         processService.deleteWorkProcessMapByParentId(processInstanceId);
         processService.deleteWorkTaskInstanceByProcessInstanceId(processInstanceId);
 
+        // todo: send refresh RPC request
+        isolationTaskDao.deleteByWorkflowInstanceId(processInstanceId);
+        coronationTaskDao.deleteByWorkflowInstanceId(processInstanceId);
+
         Map<String, Object> result = new HashMap<>();
         if (delete > 0) {
             putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
index 9fe05ca9ca..a917a6014c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
@@ -1,15 +1,17 @@
 package org.apache.dolphinscheduler.dao.dto;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class TaskSimpleInfoDTO {
 
-    private String taskName;
+    private String taskNode;
     private long taskCode;
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index fd1c8d7204..7066a3a9e1 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -50,14 +50,15 @@ public interface CommandMapper extends BaseMapper<Command> {
 
     /**
      * query command page by slot
+     *
      * @return command list
      */
     List<Command> queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset,
                                          @Param("masterCount") int masterCount,
                                          @Param("thisMasterSlot") int thisMasterSlot);
 
-    void batchInsertCommand(List<Command> commands);
+    void batchInsertCommand(@Param("commands") List<Command> commands);
 
-    List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(@Param("workflowInstanceId") long workflowInstanceId,
-                                                                     @Param("command_type") int commandType);
+    List<Command> queryCommandByWorkflowInstanceIdAndCommandType(@Param("workflowInstanceId") long workflowInstanceId,
+                                                                 @Param("commandType") int commandType);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
index d5829de216..5aa7465303 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
@@ -21,4 +21,5 @@ public interface CoronationTaskMapper extends BaseMapper<CoronationTask> {
 
     int queryAllCoronationTaskNumber();
 
+    int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
index 3bd1045682..7dd14299fd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -23,4 +23,5 @@ public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
 
     List<IsolationTask> queryAllIsolationTask();
 
+    int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
index 1893e8ffd6..f747f8bb47 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
@@ -25,4 +25,5 @@ public interface CoronationTaskDao {
 
     int queryAllCoronationTaskNumber();
 
+    int deleteByWorkflowInstanceId(Integer processInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
index b38b30569a..e9757cf63b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -30,4 +30,6 @@ public interface IsolationTaskDao {
     void insert(IsolationTask isolationTaskDTO);
 
     void batchInsert(List<IsolationTask> isolationTasks);
+
+    int deleteByWorkflowInstanceId(Integer processInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
index 2365b5f2bb..f66e78d4bd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
@@ -26,7 +26,7 @@ public class CommandDaoImpl implements CommandDao {
 
     @Override
     public List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId) {
-        return commandMapper.queryRecoveryCoronationCommandByWorkflowInstanceId(workflowInstanceId,
+        return commandMapper.queryCommandByWorkflowInstanceIdAndCommandType(workflowInstanceId,
                 CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS.getCode());
     }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
index 893b28fbbb..b2892781de 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
@@ -62,4 +62,9 @@ public class CoronationTaskDaoImpl implements CoronationTaskDao {
     public int queryAllCoronationTaskNumber() {
         return coronationTaskMapper.queryAllCoronationTaskNumber();
     }
+
+    @Override
+    public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+        return coronationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
index 0608adba0b..57d5b31de4 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -71,4 +71,9 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
         }
         isolationTaskMapper.batchInsert(isolationTasks);
     }
+
+    @Override
+    public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+        return isolationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index 043a2827b6..15083ba52c 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -80,6 +80,7 @@
             #{command.taskDependType},
             #{command.failureStrategy},
             #{command.warningType},
+            #{command.warningGroupId},
             #{command.scheduleTime},
             #{command.startTime},
             #{command.executorId},
@@ -91,4 +92,12 @@
             )
         </foreach>
     </insert>
+
+    <select id="queryCommandByWorkflowInstanceIdAndCommandType"
+            resultType="org.apache.dolphinscheduler.dao.entity.Command">
+        select *
+        from t_ds_command
+        where process_instance_id = #{workflowInstanceId}
+          and command_type = #{commandType}
+    </select>
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
index 0967fa71ec..fc7bfc9660 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
@@ -66,4 +66,10 @@
         </foreach>
     </insert>
 
+    <delete id="deleteByWorkflowInstanceId">
+        delete
+        from t_ds_coronation_task
+        where workflow_instance_id = #{workflowInstanceId}
+    </delete>
+
 </mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
index 390591be26..a6f72bddc2 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -78,4 +78,10 @@
         </foreach>
     </insert>
 
+    <delete id="deleteByWorkflowInstanceId">
+        delete
+        from t_ds_isolation_task
+        where workflow_instance_id = #{workflowInstanceId}
+    </delete>
+
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f4c0623b70..7fda26c0ad 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -234,6 +234,12 @@
         </foreach>
         order by id asc
     </select>
+    <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_instance
+        where state = #{state}
+    </select>
     <select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
         <include refid="baseSql"/>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 7c63a84e2c..87a4a7c0ab 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -42,9 +42,9 @@
     <select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer">
         select id
         from t_ds_task_instance
-        WHERE  process_instance_id = #{processInstanceId}
-        and state = #{state}
-        and flag = 1
+        WHERE process_instance_id = #{processInstanceId}
+          and state = #{state}
+          and flag = 1
     </select>
     <select id="findValidTaskListByProcessId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
@@ -54,6 +54,15 @@
         and flag = #{flag}
         order by start_time desc
     </select>
+    <select id="findValidTaskListByProcessIdAndTaskStatus"
+            resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_instance
+        WHERE process_instance_id = #{processInstanceId}
+        and state = #{status}
+        and flag = #{flag}
+    </select>
     <select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
index 1a88860490..2faeee6f5a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
@@ -18,7 +18,7 @@ public class RefreshCoronationMetadataProcessor implements NettyRequestProcessor
 
     @Override
     public void process(Channel channel, Command command) {
-        if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
+        if (command.getType() != CommandType.REFRESH_CORONATION_METADATA_REQUEST) {
             throw new IllegalArgumentException(String.format("The current rpc command : %s is invalidated", command));
         }
         coronationMetadataManager.refreshCoronationTaskMetadata();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
index 3522ac4fec..f56aef464a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
@@ -33,4 +33,8 @@ public class MasterRPCClient {
         client.sendSync(host, rpcCommand, timeoutMills);
     }
 
+    public void sendCommand(@NonNull Host host, @NonNull Command rpcCommand) throws RemotingException {
+        client.send(host, rpcCommand);
+    }
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 4644611a7f..3aba5c6bde 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -74,6 +74,7 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
 
+    @Autowired
     private RefreshCoronationMetadataProcessor refreshCoronationMetadataProcessor;
 
     @Autowired
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index b284c80ef2..ebe2000626 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -1655,6 +1655,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 // No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION
                 return ExecutionStatus.PAUSE_BY_ISOLATION;
             }
+            if (coronationMetadataManager.isInCoronationMode()) {
+                Optional<TaskInstance> pauseByCoronationTaskInstance = taskInstanceMap.values().stream()
+                        .filter(taskInstance -> taskInstance.getState().typeIsPauseByCoronation())
+                        .findAny();
+                if (pauseByCoronationTaskInstance.isPresent()) {
+                    return ExecutionStatus.PAUSE_BY_CORONATION;
+                }
+            }
+
             // if the waiting queue is empty and the status is in progress, then success
             return ExecutionStatus.SUCCESS;
         }
@@ -1999,7 +2008,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                     coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
             Integer parentNodeInstanceId = validTaskMap.get(parentNodeCode);
             if (parentNodeInstanceId != null) {
-                TaskInstance taskInstance = activeTaskProcessorMaps.get(parentNodeInstanceId).taskInstance();
+                ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(parentNodeInstanceId);
+                if (iTaskProcessor == null) {
+                    continue;
+                }
+                TaskInstance taskInstance = iTaskProcessor.taskInstance();
                 if (taskInstance.getState().typeIsPauseByCoronation()) {
                     // resubmit the task to standbylist, this task will be resubmit again.
                     taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
index 68aba94bda..d698c3f20b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
@@ -94,10 +94,10 @@ public class CoronationMetadataManager {
         if (coronationTaskInMemory.isEmpty()) {
             if (coronationMode == CoronationMode.IN_CORONATION) {
                 log.info("There is not coronation tasks, will begin to close coronation mode...");
-                closeCoronation();
                 coronationMode = CoronationMode.NOT_IN_CORONATION;
                 log.info("Close coronation mode success...");
             }
+            insertRecoveryCoronationCommandIfNeeded();
         } else {
             addCoronationTasks(addCoronationTasks);
             cancelCoronationTasks(deleteCoronationTasks);
@@ -107,7 +107,7 @@ public class CoronationMetadataManager {
             }
         }
         stopWatch.stop();
-        log.info("Refresh coronation task from DB finished, cost: {}", stopWatch.getTime());
+        log.info("Refresh coronation task from DB finished, cost: {} ms", stopWatch.getTime());
     }
 
     public boolean isCoronationTask(int workflowInstanceId, long taskCode) {
@@ -135,7 +135,7 @@ public class CoronationMetadataManager {
                 RefreshCoronationMetadataRequest request = new RefreshCoronationMetadataRequest();
                 for (Server master : masters) {
                     try {
-                        masterRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+                        masterRPCClient.sendCommand(new Host(master.getHost(), master.getPort()),
                                 request.convert2Command());
                     } catch (Exception e) {
                         log.error(
@@ -151,19 +151,19 @@ public class CoronationMetadataManager {
         }
     }
 
-    private void closeCoronation() {
+    private void insertRecoveryCoronationCommandIfNeeded() {
         // The current server is in coronation mode, need to close coronation.
         // Need to acquire a lock to guarantee there is only one master recovery the pause_by_coronation workflow
+        // block to acquire the master lock
         try {
-            // block to acquire the master lock
             if (!registryClient.getLock(NodeType.MASTER.getRegistryPath())) {
-                log.error("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
+                log.warn("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
                 return;
             }
             // find the all instance that need to be recovery
             // create recovery command
             List<Command> needToInsertCommand =
-                    processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_ISOLATION)
+                    processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_CORONATION)
                             .stream()
                             .filter(processInstance -> {
                                 List<Command> commands = commandDao
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 43000df020..38d6535f1d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1633,7 +1633,8 @@ public class ProcessServiceImpl implements ProcessService {
                 || state == ExecutionStatus.DELAY_EXECUTION
                 || state == ExecutionStatus.KILL
                 || state == ExecutionStatus.DISPATCH
-                || state == ExecutionStatus.PAUSE_BY_ISOLATION) {
+                || state == ExecutionStatus.PAUSE_BY_ISOLATION
+                || state == ExecutionStatus.PAUSE_BY_CORONATION) {
             return state;
         }
         // return pasue /stop if process instance state is ready pause / stop


[dolphinscheduler] 01/05: Catch exception when insert alert failed (#179)

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 7e7b05185269b27867535824b16e2ea0f9ab5ea5
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Oct 26 11:33:59 2022 +0800

    Catch exception when insert alert failed (#179)
---
 .../master/runner/WorkflowExecuteRunnable.java     | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index e102238178..24d5d31451 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -359,13 +359,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     }
 
     public void processTimeout() {
-        ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-        this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
+        try {
+            ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+            this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
+        } catch (Exception ex) {
+            logger.error("Send workflow instance timeout alert error", ex);
+        }
     }
 
     public void taskTimeout(TaskInstance taskInstance) {
-        ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-        processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
+        try {
+            ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+            processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
+        } catch (Exception ex) {
+            logger.error("Send task instance timeout alert error", ex);
+        }
     }
 
     public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
@@ -802,7 +810,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
         ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
         if (processAlertManager.isNeedToSendWarning(processInstance)) {
-            processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
+            try {
+                processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
+            } catch (Exception ex) {
+                // the trace id has been set on upstream
+                logger.error("Send workflow instance alert failed");
+            }
         }
         if (processInstance.getState().typeIsSuccess()) {
             processAlertManager.closeAlert(processInstance);


[dolphinscheduler] 05/05: Refresh coronation when delete workflow instance

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 040a9eb71865d216202a38d35102e8e98357fe6b
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Nov 1 09:29:10 2022 +0800

    Refresh coronation when delete workflow instance
---
 .../server/master/service/IsolationMetadataManager.java               | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
index d90a95a42d..25b3014cc7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
@@ -47,12 +47,12 @@ public class IsolationMetadataManager {
                         .stream()
                         .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
 
-        List<IsolationTask> needToAddIsolationTasks = isolationTasksInMemory.values()
+        List<IsolationTask> needToCancelIsolationTasks = isolationTasksInMemory.values()
                 .stream()
                 .filter(isolationTask -> !isolationTasksInDB.containsKey(isolationTask.getId()))
                 .collect(Collectors.toList());
 
-        List<IsolationTask> needToCancelIsolationTasks = isolationTasksInDB.values()
+        List<IsolationTask> needToAddIsolationTasks = isolationTasksInDB.values()
                 .stream()
                 .filter(isolationTask -> !isolationTasksInMemory.containsKey(isolationTask.getId()))
                 .collect(Collectors.toList());