You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/11/01 01:30:29 UTC
[dolphinscheduler] 04/05: Change requestBody
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 05de0d280c2f21bd089c3ad6ac0c2669ca5a6b36
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun Oct 30 18:46:20 2022 +0800
Change requestBody
---
.../api/controller/CoronationTaskController.java | 13 ++++++++++++-
.../api/dto/request/CoronationTaskListingRequest.java | 2 ++
.../api/dto/request/CoronationTaskSubmitRequest.java | 4 +++-
.../api/service/impl/CoronationTaskServiceImpl.java | 5 +++--
.../api/service/impl/ProcessInstanceServiceImpl.java | 12 ++++++++++++
.../dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java | 4 +++-
.../apache/dolphinscheduler/dao/mapper/CommandMapper.java | 7 ++++---
.../dolphinscheduler/dao/mapper/CoronationTaskMapper.java | 1 +
.../dolphinscheduler/dao/mapper/IsolationTaskMapper.java | 1 +
.../dao/repository/CoronationTaskDao.java | 1 +
.../dolphinscheduler/dao/repository/IsolationTaskDao.java | 2 ++
.../dao/repository/impl/CommandDaoImpl.java | 2 +-
.../dao/repository/impl/CoronationTaskDaoImpl.java | 5 +++++
.../dao/repository/impl/IsolationTaskDaoImpl.java | 5 +++++
.../apache/dolphinscheduler/dao/mapper/CommandMapper.xml | 9 +++++++++
.../dolphinscheduler/dao/mapper/CoronationTaskMapper.xml | 6 ++++++
.../dolphinscheduler/dao/mapper/IsolationTaskMapper.xml | 6 ++++++
.../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml | 6 ++++++
.../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml | 15 ++++++++++++---
.../coronation/RefreshCoronationMetadataProcessor.java | 2 +-
.../server/master/rpc/MasterRPCClient.java | 4 ++++
.../server/master/rpc/MasterRPCServer.java | 1 +
.../server/master/runner/WorkflowExecuteRunnable.java | 15 ++++++++++++++-
.../server/master/service/CoronationMetadataManager.java | 14 +++++++-------
.../service/process/ProcessServiceImpl.java | 3 ++-
25 files changed, 123 insertions(+), 22 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
index 80ee7a1107..7540c118b9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
@@ -9,6 +9,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.CoronationTaskService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
@@ -24,6 +25,7 @@ import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
@@ -83,7 +85,16 @@ public class CoronationTaskController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<PageInfo<CoronationTaskDTO>> listingCoronationTasks(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("projectCode") long projectCode,
- @RequestBody CoronationTaskListingRequest request) {
+ @RequestParam(required = false) String workflowInstanceName,
+ @RequestParam(required = false) String taskName,
+ @RequestParam Integer pageNo,
+ @RequestParam Integer pageSize) {
+ CoronationTaskListingRequest request = CoronationTaskListingRequest.builder()
+ .workflowInstanceName(workflowInstanceName)
+ .taskName(taskName)
+ .pageNo(pageNo)
+ .pageSize(pageSize)
+ .build();
return Result.success(coronationTaskService.listingCoronationTasks(loginUser, projectCode, request));
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
index 7ecded2c1c..5e8a384e17 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
@@ -1,12 +1,14 @@
package org.apache.dolphinscheduler.api.dto.request;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotNull;
@Data
+@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CoronationTaskListingRequest {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
index 2597a1206e..77d9008ee5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
@@ -1,6 +1,7 @@
package org.apache.dolphinscheduler.api.dto.request;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
@@ -8,9 +9,10 @@ import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
import java.util.List;
@Data
+@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CoronationTaskSubmitRequest {
- private List<CoronationTaskParseVO> CoronationTasks;
+ private List<CoronationTaskParseVO> coronationTasks;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
index a6c0691c8b..05269693cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
@@ -89,7 +90,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService {
throw new ServiceException(Status.CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED);
}
List<TaskSimpleInfoDTO> previousTaskNodeDTO =
- workflowDAG.getPreviousNodes(Long.toString(vo.getTaskCode()))
+ DagHelper.getAllPreNodes(Long.toString(vo.getTaskCode()), workflowDAG)
.stream()
.map(previousNodeCode -> {
TaskNode node = workflowDAG.getNode(previousNodeCode);
@@ -126,7 +127,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService {
List<CoronationTask> coronationTasks = vos.stream()
.map(vo -> {
- Set<String> previousNodes = workflowDAG.getPreviousNodes(vo.getTaskCode().toString());
+ Set<String> previousNodes = DagHelper.getAllPreNodes(vo.getTaskCode().toString(), workflowDAG);
Set<String> selectNodes = vo.getUpstreamTasks()
.stream()
.map(taskNode -> Long.toString(taskNode.getTaskCode()))
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index c5a84a2514..5aa0827221 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -61,6 +61,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -164,6 +166,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
private CuringParamsService curingGlobalParamsService;
+ @Autowired
+ private IsolationTaskDao isolationTaskDao;
+
+ @Autowired
+ private CoronationTaskDao coronationTaskDao;
+
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
*/
@@ -656,6 +664,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processService.deleteWorkProcessMapByParentId(processInstanceId);
processService.deleteWorkTaskInstanceByProcessInstanceId(processInstanceId);
+ // todo: send refresh RPC request
+ isolationTaskDao.deleteByWorkflowInstanceId(processInstanceId);
+ coronationTaskDao.deleteByWorkflowInstanceId(processInstanceId);
+
Map<String, Object> result = new HashMap<>();
if (delete > 0) {
putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
index 9fe05ca9ca..a917a6014c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
@@ -1,15 +1,17 @@
package org.apache.dolphinscheduler.dao.dto;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@Data
+@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskSimpleInfoDTO {
- private String taskName;
+ private String taskNode;
private long taskCode;
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index fd1c8d7204..7066a3a9e1 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -50,14 +50,15 @@ public interface CommandMapper extends BaseMapper<Command> {
/**
* query command page by slot
+ *
* @return command list
*/
List<Command> queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset,
@Param("masterCount") int masterCount,
@Param("thisMasterSlot") int thisMasterSlot);
- void batchInsertCommand(List<Command> commands);
+ void batchInsertCommand(@Param("commands") List<Command> commands);
- List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(@Param("workflowInstanceId") long workflowInstanceId,
- @Param("command_type") int commandType);
+ List<Command> queryCommandByWorkflowInstanceIdAndCommandType(@Param("workflowInstanceId") long workflowInstanceId,
+ @Param("commandType") int commandType);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
index d5829de216..5aa7465303 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
@@ -21,4 +21,5 @@ public interface CoronationTaskMapper extends BaseMapper<CoronationTask> {
int queryAllCoronationTaskNumber();
+ int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
index 3bd1045682..7dd14299fd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -23,4 +23,5 @@ public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
List<IsolationTask> queryAllIsolationTask();
+ int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
index 1893e8ffd6..f747f8bb47 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
@@ -25,4 +25,5 @@ public interface CoronationTaskDao {
int queryAllCoronationTaskNumber();
+ int deleteByWorkflowInstanceId(Integer processInstanceId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
index b38b30569a..e9757cf63b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -30,4 +30,6 @@ public interface IsolationTaskDao {
void insert(IsolationTask isolationTaskDTO);
void batchInsert(List<IsolationTask> isolationTasks);
+
+ int deleteByWorkflowInstanceId(Integer processInstanceId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
index 2365b5f2bb..f66e78d4bd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
@@ -26,7 +26,7 @@ public class CommandDaoImpl implements CommandDao {
@Override
public List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId) {
- return commandMapper.queryRecoveryCoronationCommandByWorkflowInstanceId(workflowInstanceId,
+ return commandMapper.queryCommandByWorkflowInstanceIdAndCommandType(workflowInstanceId,
CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS.getCode());
}
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
index 893b28fbbb..b2892781de 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
@@ -62,4 +62,9 @@ public class CoronationTaskDaoImpl implements CoronationTaskDao {
public int queryAllCoronationTaskNumber() {
return coronationTaskMapper.queryAllCoronationTaskNumber();
}
+
+ @Override
+ public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+ return coronationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
index 0608adba0b..57d5b31de4 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -71,4 +71,9 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
}
isolationTaskMapper.batchInsert(isolationTasks);
}
+
+ @Override
+ public int deleteByWorkflowInstanceId(Integer workflowInstanceId) {
+ return isolationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId);
+ }
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index 043a2827b6..15083ba52c 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -80,6 +80,7 @@
#{command.taskDependType},
#{command.failureStrategy},
#{command.warningType},
+ #{command.warningGroupId},
#{command.scheduleTime},
#{command.startTime},
#{command.executorId},
@@ -91,4 +92,12 @@
)
</foreach>
</insert>
+
+ <select id="queryCommandByWorkflowInstanceIdAndCommandType"
+ resultType="org.apache.dolphinscheduler.dao.entity.Command">
+ select *
+ from t_ds_command
+ where process_instance_id = #{workflowInstanceId}
+ and command_type = #{commandType}
+ </select>
</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
index 0967fa71ec..fc7bfc9660 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
@@ -66,4 +66,10 @@
</foreach>
</insert>
+ <delete id="deleteByWorkflowInstanceId">
+ delete
+ from t_ds_coronation_task
+ where workflow_instance_id = #{workflowInstanceId}
+ </delete>
+
</mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
index 390591be26..a6f72bddc2 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -78,4 +78,10 @@
</foreach>
</insert>
+ <delete id="deleteByWorkflowInstanceId">
+ delete
+ from t_ds_isolation_task
+ where workflow_instance_id = #{workflowInstanceId}
+ </delete>
+
</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f4c0623b70..7fda26c0ad 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -234,6 +234,12 @@
</foreach>
order by id asc
</select>
+ <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_instance
+ where state = #{state}
+ </select>
<select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 7c63a84e2c..87a4a7c0ab 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -42,9 +42,9 @@
<select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer">
select id
from t_ds_task_instance
- WHERE process_instance_id = #{processInstanceId}
- and state = #{state}
- and flag = 1
+ WHERE process_instance_id = #{processInstanceId}
+ and state = #{state}
+ and flag = 1
</select>
<select id="findValidTaskListByProcessId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
@@ -54,6 +54,15 @@
and flag = #{flag}
order by start_time desc
</select>
+ <select id="findValidTaskListByProcessIdAndTaskStatus"
+ resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_instance
+ WHERE process_instance_id = #{processInstanceId}
+ and state = #{status}
+ and flag = #{flag}
+ </select>
<select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
index 1a88860490..2faeee6f5a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
@@ -18,7 +18,7 @@ public class RefreshCoronationMetadataProcessor implements NettyRequestProcessor
@Override
public void process(Channel channel, Command command) {
- if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
+ if (command.getType() != CommandType.REFRESH_CORONATION_METADATA_REQUEST) {
throw new IllegalArgumentException(String.format("The current rpc command : %s is invalidated", command));
}
coronationMetadataManager.refreshCoronationTaskMetadata();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
index 3522ac4fec..f56aef464a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
@@ -33,4 +33,8 @@ public class MasterRPCClient {
client.sendSync(host, rpcCommand, timeoutMills);
}
+ public void sendCommand(@NonNull Host host, @NonNull Command rpcCommand) throws RemotingException {
+ client.send(host, rpcCommand);
+ }
+
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 4644611a7f..3aba5c6bde 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -74,6 +74,7 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
+ @Autowired
private RefreshCoronationMetadataProcessor refreshCoronationMetadataProcessor;
@Autowired
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index b284c80ef2..ebe2000626 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -1655,6 +1655,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION
return ExecutionStatus.PAUSE_BY_ISOLATION;
}
+ if (coronationMetadataManager.isInCoronationMode()) {
+ Optional<TaskInstance> pauseByCoronationTaskInstance = taskInstanceMap.values().stream()
+ .filter(taskInstance -> taskInstance.getState().typeIsPauseByCoronation())
+ .findAny();
+ if (pauseByCoronationTaskInstance.isPresent()) {
+ return ExecutionStatus.PAUSE_BY_CORONATION;
+ }
+ }
+
// if the waiting queue is empty and the status is in progress, then success
return ExecutionStatus.SUCCESS;
}
@@ -1999,7 +2008,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
Integer parentNodeInstanceId = validTaskMap.get(parentNodeCode);
if (parentNodeInstanceId != null) {
- TaskInstance taskInstance = activeTaskProcessorMaps.get(parentNodeInstanceId).taskInstance();
+ ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(parentNodeInstanceId);
+ if (iTaskProcessor == null) {
+ continue;
+ }
+ TaskInstance taskInstance = iTaskProcessor.taskInstance();
if (taskInstance.getState().typeIsPauseByCoronation()) {
// resubmit the task to standbylist, this task will be resubmit again.
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
index 68aba94bda..d698c3f20b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
@@ -94,10 +94,10 @@ public class CoronationMetadataManager {
if (coronationTaskInMemory.isEmpty()) {
if (coronationMode == CoronationMode.IN_CORONATION) {
log.info("There is not coronation tasks, will begin to close coronation mode...");
- closeCoronation();
coronationMode = CoronationMode.NOT_IN_CORONATION;
log.info("Close coronation mode success...");
}
+ insertRecoveryCoronationCommandIfNeeded();
} else {
addCoronationTasks(addCoronationTasks);
cancelCoronationTasks(deleteCoronationTasks);
@@ -107,7 +107,7 @@ public class CoronationMetadataManager {
}
}
stopWatch.stop();
- log.info("Refresh coronation task from DB finished, cost: {}", stopWatch.getTime());
+ log.info("Refresh coronation task from DB finished, cost: {} ms", stopWatch.getTime());
}
public boolean isCoronationTask(int workflowInstanceId, long taskCode) {
@@ -135,7 +135,7 @@ public class CoronationMetadataManager {
RefreshCoronationMetadataRequest request = new RefreshCoronationMetadataRequest();
for (Server master : masters) {
try {
- masterRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+ masterRPCClient.sendCommand(new Host(master.getHost(), master.getPort()),
request.convert2Command());
} catch (Exception e) {
log.error(
@@ -151,19 +151,19 @@ public class CoronationMetadataManager {
}
}
- private void closeCoronation() {
+ private void insertRecoveryCoronationCommandIfNeeded() {
// The current server is in coronation mode, need to close coronation.
// Need to acquire a lock to guarantee there is only one master recovery the pause_by_coronation workflow
+ // block to acquire the master lock
try {
- // block to acquire the master lock
if (!registryClient.getLock(NodeType.MASTER.getRegistryPath())) {
- log.error("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
+ log.warn("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
return;
}
// find the all instance that need to be recovery
// create recovery command
List<Command> needToInsertCommand =
- processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_ISOLATION)
+ processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_CORONATION)
.stream()
.filter(processInstance -> {
List<Command> commands = commandDao
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 43000df020..38d6535f1d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1633,7 +1633,8 @@ public class ProcessServiceImpl implements ProcessService {
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
|| state == ExecutionStatus.DISPATCH
- || state == ExecutionStatus.PAUSE_BY_ISOLATION) {
+ || state == ExecutionStatus.PAUSE_BY_ISOLATION
+ || state == ExecutionStatus.PAUSE_BY_CORONATION) {
return state;
}
// return pasue /stop if process instance state is ready pause / stop