You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/11/01 01:30:45 UTC

[dolphinscheduler] 01/02: Fix coronation task cannot recovery

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit d92c65b72eec8e9d0389ada55040f1882d1e9c88
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Oct 31 23:13:48 2022 +0800

    Fix coronation task cannot recovery
---
 .../api/controller/CoronationTaskController.java          | 13 ++++++++++++-
 .../api/dto/request/CoronationTaskListingRequest.java     |  2 ++
 .../api/dto/request/CoronationTaskSubmitRequest.java      |  4 +++-
 .../api/service/impl/CoronationTaskServiceImpl.java       |  5 +++--
 .../api/service/impl/ProcessInstanceServiceImpl.java      | 12 ++++++++++++
 .../dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java       |  4 +++-
 .../apache/dolphinscheduler/dao/mapper/CommandMapper.java |  7 ++++---
 .../dolphinscheduler/dao/mapper/CoronationTaskMapper.java |  1 +
 .../dolphinscheduler/dao/mapper/IsolationTaskMapper.java  |  1 +
 .../dao/repository/CoronationTaskDao.java                 |  1 +
 .../dolphinscheduler/dao/repository/IsolationTaskDao.java |  2 ++
 .../dao/repository/impl/CommandDaoImpl.java               |  2 +-
 .../dao/repository/impl/CoronationTaskDaoImpl.java        |  5 +++++
 .../dao/repository/impl/IsolationTaskDaoImpl.java         |  5 +++++
 .../apache/dolphinscheduler/dao/mapper/CommandMapper.xml  |  9 +++++++++
 .../dolphinscheduler/dao/mapper/CoronationTaskMapper.xml  |  6 ++++++
 .../dolphinscheduler/dao/mapper/IsolationTaskMapper.xml   |  6 ++++++
 .../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml |  6 ++++++
 .../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml    | 15 ++++++++++++---
 .../coronation/RefreshCoronationMetadataProcessor.java    |  2 +-
 .../server/master/rpc/MasterRPCClient.java                |  4 ++++
 .../server/master/rpc/MasterRPCServer.java                |  1 +
 .../server/master/runner/WorkflowExecuteRunnable.java     | 15 ++++++++++++++-
 .../server/master/service/CoronationMetadataManager.java  | 14 +++++++-------
 .../service/process/ProcessServiceImpl.java               |  3 ++-
 25 files changed, 123 insertions(+), 22 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
index 80ee7a1107..7540c118b9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
@@ -9,6 +9,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.CoronationTaskService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
 import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
@@ -24,6 +25,7 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestAttribute;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 import springfox.documentation.annotations.ApiIgnore;
@@ -83,7 +85,16 @@ public class CoronationTaskController {
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result<PageInfo<CoronationTaskDTO>> listingCoronationTasks(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                                                       @PathVariable("projectCode") long projectCode,
-                                                                      @RequestBody CoronationTaskListingRequest request) {
+                                                                      @RequestParam(required = false) String workflowInstanceName,
+                                                                      @RequestParam(required = false) String taskName,
+                                                                      @RequestParam Integer pageNo,
+                                                                      @RequestParam Integer pageSize) {
+        CoronationTaskListingRequest request = CoronationTaskListingRequest.builder()
+                .workflowInstanceName(workflowInstanceName)
+                .taskName(taskName)
+                .pageNo(pageNo)
+                .pageSize(pageSize)
+                .build();
         return Result.success(coronationTaskService.listingCoronationTasks(loginUser, projectCode, request));
 
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
index 7ecded2c1c..5e8a384e17 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
@@ -1,12 +1,14 @@
 package org.apache.dolphinscheduler.api.dto.request;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import javax.validation.constraints.NotNull;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class CoronationTaskListingRequest {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
index 2597a1206e..77d9008ee5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
@@ -1,6 +1,7 @@
 package org.apache.dolphinscheduler.api.dto.request;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
@@ -8,9 +9,10 @@ import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
 import java.util.List;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class CoronationTaskSubmitRequest {
 
-    private List<CoronationTaskParseVO> CoronationTasks;
+    private List<CoronationTaskParseVO> coronationTasks;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
index a6c0691c8b..05269693cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
@@ -89,7 +90,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService {
                         throw new ServiceException(Status.CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED);
                     }
                     List<TaskSimpleInfoDTO> previousTaskNodeDTO =
-                            workflowDAG.getPreviousNodes(Long.toString(vo.getTaskCode()))
+                            DagHelper.getAllPreNodes(Long.toString(vo.getTaskCode()), workflowDAG)
                                     .stream()
                                     .map(previousNodeCode -> {
                                         TaskNode node = workflowDAG.getNode(previousNodeCode);
@@ -126,7 +127,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService {
 
             List<CoronationTask> coronationTasks = vos.stream()
                     .map(vo -> {
-                        Set<String> previousNodes = workflowDAG.getPreviousNodes(vo.getTaskCode().toString());
+                        Set<String> previousNodes = DagHelper.getAllPreNodes(vo.getTaskCode().toString(), workflowDAG);
                         Set<String> selectNodes = vo.getUpstreamTasks()
                                 .stream()
                                 .map(taskNode -> Long.toString(taskNode.getTaskCode()))
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index c5a84a2514..5aa0827221 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -61,6 +61,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -164,6 +166,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
     @Autowired
     private CuringParamsService curingGlobalParamsService;
 
+    @Autowired
+    private IsolationTaskDao isolationTaskDao;
+
+    @Autowired
+    private CoronationTaskDao coronationTaskDao;
+
     /**
      * return top n SUCCESS process instance order by running time which started between startTime and endTime
      */
@@ -656,6 +664,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         processService.deleteWorkProcessMapByParentId(processInstanceId);
         processService.deleteWorkTaskInstanceByProcessInstanceId(processInstanceId);
 
+        // todo: send refresh RPC request
+        isolationTaskDao.deleteByWorkflowInstanceId(processInstanceId);
+        coronationTaskDao.deleteByWorkflowInstanceId(processInstanceId);
+
         Map<String, Object> result = new HashMap<>();
         if (delete > 0) {
             putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
index 9fe05ca9ca..a917a6014c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
@@ -1,15 +1,17 @@
 package org.apache.dolphinscheduler.dao.dto;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class TaskSimpleInfoDTO {
 
-    private String taskName;
+    private String taskNode;
     private long taskCode;
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index fd1c8d7204..7066a3a9e1 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -50,14 +50,15 @@ public interface CommandMapper extends BaseMapper<Command> {
 
     /**
      * query command page by slot
+     *
      * @return command list
      */
     List<Command> queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset,
                                          @Param("masterCount") int masterCount,
                                          @Param("thisMasterSlot") int thisMasterSlot);
 
-    void batchInsertCommand(List<Command> commands);
+    void batchInsertCommand(@Param("commands") List<Command> commands);
 
-    List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(@Param("workflowInstanceId") long workflowInstanceId,
-                                                                     @Param("command_type") int commandType);
+    List<Command> queryCommandByWorkflowInstanceIdAndCommandType(@Param("workflowInstanceId") long workflowInstanceId,
+                                                                 @Param("commandType") int commandType);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
index d5829de216..5aa7465303 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
@@ -21,4 +21,5 @@ public interface CoronationTaskMapper extends BaseMapper<CoronationTask> {
 
     int queryAllCoronationTaskNumber();
 
+    int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
index 3bd1045682..7dd14299fd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -23,4 +23,5 @@ public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
 
     List<IsolationTask> queryAllIsolationTask();
 
+    int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
index 1893e8ffd6..f747f8bb47 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
@@ -25,4 +25,5 @@ public interface CoronationTaskDao {
 
     int queryAllCoronationTaskNumber();
 
+    int deleteByWorkflowInstanceId(Integer processInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
index b38b30569a..e9757cf63b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -30,4 +30,6 @@ public interface IsolationTaskDao {
     void insert(IsolationTask isolationTaskDTO);
 
     void batchInsert(List<IsolationTask> isolationTasks);
+
+    int deleteByWorkflowInstanceId(Integer processInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
index 2365b5f2bb..f66e78d4bd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
@@ -26,7 +26,7 @@ public class CommandDaoImpl implements CommandDao {
 
     @Override
     public List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId) {
-        return commandMapper.queryRecoveryCoronationCommandByWorkflowInstanceId(workflowInstanceId,
+        return commandMapper.queryCommandByWorkflowInstanceIdAndCommandType(workflowInstanceId,
                 CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS.getCode());
     }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
index 893b28fbbb..b2892781de 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
@@ -62,4 +62,9 @@ public class CoronationTaskDaoImpl implements CoronationTaskDao {
     public int queryAllCoronationTaskNumber() {
         return coronationTaskMapper.queryAllCoronationTaskNumber();
     }
+
+    @Override
+    public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+        return coronationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
index 0608adba0b..57d5b31de4 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -71,4 +71,9 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
         }
         isolationTaskMapper.batchInsert(isolationTasks);
     }
+
+    @Override
+    public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+        return isolationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index 043a2827b6..15083ba52c 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -80,6 +80,7 @@
             #{command.taskDependType},
             #{command.failureStrategy},
             #{command.warningType},
+            #{command.warningGroupId},
             #{command.scheduleTime},
             #{command.startTime},
             #{command.executorId},
@@ -91,4 +92,12 @@
             )
         </foreach>
     </insert>
+
+    <select id="queryCommandByWorkflowInstanceIdAndCommandType"
+            resultType="org.apache.dolphinscheduler.dao.entity.Command">
+        select *
+        from t_ds_command
+        where process_instance_id = #{workflowInstanceId}
+          and command_type = #{commandType}
+    </select>
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
index 0967fa71ec..fc7bfc9660 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
@@ -66,4 +66,10 @@
         </foreach>
     </insert>
 
+    <delete id="deleteByWorkflowInstanceId">
+        delete
+        from t_ds_coronation_task
+        where workflow_instance_id = #{workflowInstanceId}
+    </delete>
+
 </mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
index 390591be26..a6f72bddc2 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -78,4 +78,10 @@
         </foreach>
     </insert>
 
+    <delete id="deleteByWorkflowInstanceId">
+        delete
+        from t_ds_isolation_task
+        where workflow_instance_id = #{workflowInstanceId}
+    </delete>
+
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f4c0623b70..7fda26c0ad 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -234,6 +234,12 @@
         </foreach>
         order by id asc
     </select>
+    <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_instance
+        where state = #{state}
+    </select>
     <select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
         <include refid="baseSql"/>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 7c63a84e2c..87a4a7c0ab 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -42,9 +42,9 @@
     <select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer">
         select id
         from t_ds_task_instance
-        WHERE  process_instance_id = #{processInstanceId}
-        and state = #{state}
-        and flag = 1
+        WHERE process_instance_id = #{processInstanceId}
+          and state = #{state}
+          and flag = 1
     </select>
     <select id="findValidTaskListByProcessId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
@@ -54,6 +54,15 @@
         and flag = #{flag}
         order by start_time desc
     </select>
+    <select id="findValidTaskListByProcessIdAndTaskStatus"
+            resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_instance
+        WHERE process_instance_id = #{processInstanceId}
+        and state = #{status}
+        and flag = #{flag}
+    </select>
     <select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
index 1a88860490..2faeee6f5a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
@@ -18,7 +18,7 @@ public class RefreshCoronationMetadataProcessor implements NettyRequestProcessor
 
     @Override
     public void process(Channel channel, Command command) {
-        if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
+        if (command.getType() != CommandType.REFRESH_CORONATION_METADATA_REQUEST) {
             throw new IllegalArgumentException(String.format("The current rpc command : %s is invalidated", command));
         }
         coronationMetadataManager.refreshCoronationTaskMetadata();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
index 3522ac4fec..f56aef464a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
@@ -33,4 +33,8 @@ public class MasterRPCClient {
         client.sendSync(host, rpcCommand, timeoutMills);
     }
 
+    public void sendCommand(@NonNull Host host, @NonNull Command rpcCommand) throws RemotingException {
+        client.send(host, rpcCommand);
+    }
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 4644611a7f..3aba5c6bde 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -74,6 +74,7 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
 
+    @Autowired
     private RefreshCoronationMetadataProcessor refreshCoronationMetadataProcessor;
 
     @Autowired
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index b284c80ef2..ebe2000626 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -1655,6 +1655,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 // No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION
                 return ExecutionStatus.PAUSE_BY_ISOLATION;
             }
+            if (coronationMetadataManager.isInCoronationMode()) {
+                Optional<TaskInstance> pauseByCoronationTaskInstance = taskInstanceMap.values().stream()
+                        .filter(taskInstance -> taskInstance.getState().typeIsPauseByCoronation())
+                        .findAny();
+                if (pauseByCoronationTaskInstance.isPresent()) {
+                    return ExecutionStatus.PAUSE_BY_CORONATION;
+                }
+            }
+
             // if the waiting queue is empty and the status is in progress, then success
             return ExecutionStatus.SUCCESS;
         }
@@ -1999,7 +2008,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                     coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
             Integer parentNodeInstanceId = validTaskMap.get(parentNodeCode);
             if (parentNodeInstanceId != null) {
-                TaskInstance taskInstance = activeTaskProcessorMaps.get(parentNodeInstanceId).taskInstance();
+                ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(parentNodeInstanceId);
+                if (iTaskProcessor == null) {
+                    continue;
+                }
+                TaskInstance taskInstance = iTaskProcessor.taskInstance();
                 if (taskInstance.getState().typeIsPauseByCoronation()) {
                     // resubmit the task to standbylist, this task will be resubmit again.
                     taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
index 68aba94bda..d698c3f20b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
@@ -94,10 +94,10 @@ public class CoronationMetadataManager {
         if (coronationTaskInMemory.isEmpty()) {
             if (coronationMode == CoronationMode.IN_CORONATION) {
                 log.info("There is not coronation tasks, will begin to close coronation mode...");
-                closeCoronation();
                 coronationMode = CoronationMode.NOT_IN_CORONATION;
                 log.info("Close coronation mode success...");
             }
+            insertRecoveryCoronationCommandIfNeeded();
         } else {
             addCoronationTasks(addCoronationTasks);
             cancelCoronationTasks(deleteCoronationTasks);
@@ -107,7 +107,7 @@ public class CoronationMetadataManager {
             }
         }
         stopWatch.stop();
-        log.info("Refresh coronation task from DB finished, cost: {}", stopWatch.getTime());
+        log.info("Refresh coronation task from DB finished, cost: {} ms", stopWatch.getTime());
     }
 
     public boolean isCoronationTask(int workflowInstanceId, long taskCode) {
@@ -135,7 +135,7 @@ public class CoronationMetadataManager {
                 RefreshCoronationMetadataRequest request = new RefreshCoronationMetadataRequest();
                 for (Server master : masters) {
                     try {
-                        masterRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+                        masterRPCClient.sendCommand(new Host(master.getHost(), master.getPort()),
                                 request.convert2Command());
                     } catch (Exception e) {
                         log.error(
@@ -151,19 +151,19 @@ public class CoronationMetadataManager {
         }
     }
 
-    private void closeCoronation() {
+    private void insertRecoveryCoronationCommandIfNeeded() {
         // The current server is in coronation mode, need to close coronation.
         // Need to acquire a lock to guarantee there is only one master recovery the pause_by_coronation workflow
+        // block to acquire the master lock
         try {
-            // block to acquire the master lock
             if (!registryClient.getLock(NodeType.MASTER.getRegistryPath())) {
-                log.error("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
+                log.warn("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
                 return;
             }
             // find the all instance that need to be recovery
             // create recovery command
             List<Command> needToInsertCommand =
-                    processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_ISOLATION)
+                    processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_CORONATION)
                             .stream()
                             .filter(processInstance -> {
                                 List<Command> commands = commandDao
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 43000df020..38d6535f1d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1633,7 +1633,8 @@ public class ProcessServiceImpl implements ProcessService {
                 || state == ExecutionStatus.DELAY_EXECUTION
                 || state == ExecutionStatus.KILL
                 || state == ExecutionStatus.DISPATCH
-                || state == ExecutionStatus.PAUSE_BY_ISOLATION) {
+                || state == ExecutionStatus.PAUSE_BY_ISOLATION
+                || state == ExecutionStatus.PAUSE_BY_CORONATION) {
             return state;
         }
         // return pasue /stop if process instance state is ready pause / stop