You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/11/01 01:30:27 UTC
[dolphinscheduler] 02/05: Support coronation task (#182)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 350f1b3825c3ac3dda0a4b79d571ee6b42a606ba
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Oct 28 14:54:17 2022 +0800
Support coronation task (#182)
* Implement isolate task api interface
* Add coronation task
---
.../api/checker/CoronationTaskChecker.java | 38 +++
.../api/checker/CoronationTaskCheckerImpl.java | 99 ++++++++
.../api/checker/IsolationTaskChecker.java | 9 -
.../api/checker/IsolationTaskCheckerImpl.java | 55 -----
.../constants/ApiFuncIdentificationConstant.java | 7 +-
.../api/controller/CoronationTaskController.java | 91 +++++++
.../api/controller/IsolationTaskController.java | 39 +--
.../dto/request/CoronationTaskListingRequest.java | 24 ++
.../dto/request/CoronationTaskParseRequest.java | 18 ++
.../dto/request/CoronationTaskSubmitRequest.java | 16 ++
.../apache/dolphinscheduler/api/enums/Status.java | 13 +
.../api/remote/ApiServerRPCClient.java | 5 +
.../api/service/CoronationTaskService.java | 30 +++
.../api/service/IsolationTaskService.java | 14 +-
.../service/impl/CoronationTaskServiceImpl.java | 205 ++++++++++++++++
.../api/service/impl/IsolationTaskServiceImpl.java | 98 +++-----
.../api/service/impl/TaskInstanceServiceImpl.java | 3 +-
.../api/vo/CoronationTaskExcelImportVO.java | 27 ++
.../api/vo/CoronationTaskParseVO.java | 34 +++
.../api/vo/IsolationTaskListingVO.java | 12 +-
.../apache/dolphinscheduler/common/Constants.java | 3 +
.../dolphinscheduler/common/enums/CommandType.java | 2 +
.../dolphinscheduler/common/utils/JSONUtils.java | 1 +
.../dao/dto/CoronationTaskDTO.java | 50 ++++
.../dao/dto/IsolationTaskStatus.java | 27 --
.../dao/dto/TaskRuntimeContext.java | 20 ++
.../dao/dto/TaskSimpleInfoDTO.java | 15 ++
.../{IsolationTask.java => CoronationTask.java} | 6 +-
.../dolphinscheduler/dao/entity/IsolationTask.java | 2 -
.../dolphinscheduler/dao/entity/TaskInstance.java | 10 +
.../dolphinscheduler/dao/mapper/CommandMapper.java | 3 +
.../dao/mapper/CoronationTaskMapper.java | 24 ++
.../dao/mapper/IsolationTaskMapper.java | 18 +-
.../dao/mapper/ProcessInstanceMapper.java | 2 +
.../dao/mapper/TaskInstanceMapper.java | 4 +
.../dao/repository/CommandDao.java | 2 +
.../dao/repository/CoronationTaskDao.java | 28 +++
.../dao/repository/IsolationTaskDao.java | 14 +-
.../dao/repository/ProcessDefinitionDao.java | 4 +-
.../dao/repository/ProcessInstanceDao.java | 5 +-
.../dao/repository/TaskInstanceDao.java | 4 +
.../dao/repository/impl/CommandDaoImpl.java | 7 +
.../dao/repository/impl/CoronationTaskDaoImpl.java | 65 +++++
.../dao/repository/impl/IsolationTaskDaoImpl.java | 32 +--
.../repository/impl/ProcessDefinitionDaoImpl.java | 6 +-
.../repository/impl/ProcessInstanceDaoImpl.java | 11 +-
.../dao/repository/impl/TaskInstanceDaoImpl.java | 8 +
.../dao/mapper/CoronationTaskMapper.xml | 69 ++++++
.../dao/mapper/IsolationTaskMapper.xml | 32 +--
.../dao/mapper/TaskInstanceMapper.xml | 2 +-
.../src/main/resources/sql/dolphinscheduler_h2.sql | 19 +-
.../main/resources/sql/dolphinscheduler_mysql.sql | 25 +-
.../RefreshCoronationMetadataProcessor.java} | 14 +-
...java => RefreshIsolationMetadataProcessor.java} | 10 +-
.../server/master/rpc/MasterRPCClient.java | 7 +-
.../server/master/rpc/MasterRPCServer.java | 14 +-
.../master/runner/FailoverExecuteThread.java | 11 +-
.../master/runner/MasterSchedulerBootstrap.java | 11 +-
.../master/runner/WorkflowExecuteRunnable.java | 208 ++++++++++++----
.../master/service/CoronationMetadataManager.java | 273 +++++++++++++++++++++
.../master/service/IsolationMetadataManager.java | 136 ++++++++++
.../master/service/IsolationTaskManager.java | 97 --------
.../transformer/MasterCommandTransformer.java | 14 ++
.../transformer/MasterCommandTransformerImpl.java | 45 ++++
.../server/master/utils/WorkflowInstanceUtils.java | 18 ++
.../master/runner/WorkflowExecuteRunnableTest.java | 46 ++--
.../remote/command/CommandType.java | 4 +-
.../RefreshCoronationMetadataRequest.java} | 7 +-
...t.java => RefreshIsolationMetadataRequest.java} | 8 +-
.../service/process/ProcessServiceImpl.java | 17 +-
.../plugin/task/api/enums/ExecutionStatus.java | 8 +-
71 files changed, 1815 insertions(+), 490 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java
new file mode 100644
index 0000000000..b44feb9556
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskChecker.java
@@ -0,0 +1,38 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface CoronationTaskChecker {
+
+ void checkCanParseCoronationTask(@NonNull User loginUser,
+ @NonNull Project project,
+ ProcessInstance processInstance,
+ @NonNull CoronationTaskExcelImportVO vo) throws ServiceException;
+
+ void checkCanSubmitTaskCoronation(@NonNull User loginUser,
+ @NonNull Project project,
+ @NonNull ProcessInstance processInstance,
+ @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG,
+ @NonNull List<CoronationTaskParseVO> voList);
+
+ void checkCanListingTaskCoronation(@NonNull User loginUser,
+ long projectCode);
+
+ void checkCanCancelTaskCoronation(@NonNull User loginUser,
+ long projectCode,
+ @NonNull ProcessInstance processInstance,
+ @NonNull CoronationTask coronationTask);
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java
new file mode 100644
index 0000000000..23a6200adb
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/CoronationTaskCheckerImpl.java
@@ -0,0 +1,99 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_CANCEL;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_PARSE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.CORONATION_TASK_VIEW;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+
+import java.util.List;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class CoronationTaskCheckerImpl implements CoronationTaskChecker {
+
+ @Autowired
+ private ProcessDefinitionDao processDefinitionDao;
+
+ @Autowired
+ private ProjectService projectService;
+
+ @Override
+ public void checkCanParseCoronationTask(@NonNull User loginUser,
+ @NonNull Project project,
+ @NonNull ProcessInstance processInstance,
+ @NonNull CoronationTaskExcelImportVO vo) throws ServiceException {
+ ProcessDefinition processDefinition =
+ processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+ processInstance.getProcessDefinitionCode()));
+ if (processDefinition.getProjectCode() != project.getCode()) {
+ throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+ }
+ projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CORONATION_TASK_PARSE);
+ }
+
+ @Override
+ public void checkCanSubmitTaskCoronation(@NonNull User loginUser,
+ @NonNull Project project,
+ @NonNull ProcessInstance processInstance,
+ @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG,
+ @NonNull List<CoronationTaskParseVO> voList) throws ServiceException {
+ ProcessDefinition processDefinition =
+ processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+ processInstance.getProcessDefinitionCode()));
+
+ if (processDefinition.getProjectCode() != project.getCode()) {
+ throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+ }
+ projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CORONATION_TASK_PARSE);
+ // todo: check if the vos can submit coronation tasks
+ // if there already exist coronaton task in db or the upstream task contains coronation task, we cannot submit
+
+ }
+
+ @Override
+ public void checkCanListingTaskCoronation(@NonNull User loginUser,
+ long projectCode) {
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ projectService.checkProjectAndAuth(loginUser, project, projectCode, CORONATION_TASK_VIEW);
+ }
+
+ @Override
+ public void checkCanCancelTaskCoronation(@NonNull User loginUser, long projectCode,
+ @NonNull ProcessInstance processInstance,
+ @NonNull CoronationTask coronationTask) {
+ ProcessDefinition processDefinition =
+ processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode())
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+ processInstance.getProcessDefinitionCode()));
+
+ if (processDefinition.getProjectCode() != projectCode) {
+ throw new ServiceException(Status.PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+ }
+
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ projectService.checkProjectAndAuth(loginUser, project, projectCode, CORONATION_TASK_CANCEL);
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
index 9ee19f10be..919464290c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
@@ -15,18 +15,9 @@ public interface IsolationTaskChecker {
ProcessInstance processInstance,
@NonNull List<IsolationTaskExcelParseVO> voList);
- void checkCanOnlineTaskIsolation(@NonNull User loginUser,
- long projectCode,
- @NonNull ProcessInstance processInstance,
- @NonNull IsolationTask isolationTask);
-
void checkCanListingTaskIsolation(@NonNull User loginUser,
long projectCode);
- void checkCanDeleteTaskIsolation(@NonNull User loginUser,
- long projectCode,
- long isolationId);
-
void checkCanCancelTaskIsolation(@NonNull User loginUser,
long projectCode,
ProcessInstance processInstance,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
index 99402f7fb6..d5d724db85 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
@@ -9,7 +9,6 @@ import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -26,14 +25,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST;
@@ -79,22 +72,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG);
}
- @Override
- public void checkCanOnlineTaskIsolation(@NonNull User loginUser,
- long projectCode,
- ProcessInstance processInstance,
- @NonNull IsolationTask isolationTask) {
- Project project = projectService.queryByCode(loginUser, projectCode);
- checkOnlineIsolationTaskAuth(loginUser, project, processInstance);
-
- if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) {
- throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE);
- }
-
- String workflowInstanceName = isolationTask.getWorkflowInstanceName();
- checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance);
- }
-
@Override
public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) {
Project project = projectService.queryByCode(loginUser, projectCode);
@@ -102,20 +79,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST);
}
- @Override
- public void checkCanDeleteTaskIsolation(@NonNull User loginUser,
- long projectCode,
- long isolationId) {
- Project project = projectService.queryByCode(loginUser, projectCode);
- projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE);
-
- IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
- .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
- if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) {
- throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE);
- }
- }
-
@Override
public void checkCanCancelTaskIsolation(@NonNull User loginUser,
long projectCode,
@@ -127,9 +90,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
Project project = projectService.queryByCode(loginUser, projectCode);
checkCancelIsolationTaskAuth(loginUser, project, processInstance);
- if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) {
- throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL);
- }
}
private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName,
@@ -173,21 +133,6 @@ public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT);
}
- private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
- if (processInstance == null) {
- throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
- }
- ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
- .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion())
- .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
- if (processDefinitionLog.getProjectCode() != project.getCode()) {
- throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
- }
-
- projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE);
- }
-
private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
index c8cdd2cb58..6bc0eb475a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.constants;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
import java.util.HashMap;
import java.util.Map;
@@ -97,10 +98,12 @@ public class ApiFuncIdentificationConstant {
public static final String FORCED_SUCCESS = "project:task-instance:force-success";
public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state";
public static final String ISOLATION_TASK_LIST = "project:isolation-task:list";
- public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online";
public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit";
public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel";
- public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete";
+
+ public static final String CORONATION_TASK_VIEW = "project:coronation-task:view";
+ public static final String CORONATION_TASK_PARSE = "project:coronation-task:parse";
+ public static final String CORONATION_TASK_CANCEL = "project:coronation-task:cancel";
public static final String VIEW_LOG = "project:log:detail";
public static final String DOWNLOAD_LOG = "project:log:download-log";
public static final String PROJECT_OVERVIEW = "project:overview:view";
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
new file mode 100644
index 0000000000..80ee7a1107
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java
@@ -0,0 +1,91 @@
+package org.apache.dolphinscheduler.api.controller;
+
+import io.swagger.annotations.Api;
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.CoronationTaskService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
+
+import java.util.List;
+
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_CANCEL_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_DELETE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_LISTING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_ONLINE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_PARSE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CORONATION_TASK_SUBMIT_ERROR;
+
+@Api(tags = "ISOLATION_TASK_TAG")
+@RestController
+@RequestMapping("/projects/{projectCode}/coronation-task")
+public class CoronationTaskController {
+
+ @Autowired
+ private CoronationTaskService coronationTaskService;
+
+ @PostMapping(value = "/parse")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(CORONATION_TASK_PARSE_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<List<CoronationTaskParseVO>> parseCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable("projectCode") long projectCode,
+ @RequestBody CoronationTaskParseRequest request) {
+ return Result.success(coronationTaskService.parseCoronationTask(loginUser, projectCode, request));
+ }
+
+ @PostMapping(value = "/submit")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(CORONATION_TASK_SUBMIT_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Void> submitCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable("projectCode") long projectCode,
+ @RequestBody CoronationTaskSubmitRequest request) {
+ coronationTaskService.submitCoronationTask(loginUser, projectCode, request);
+ return Result.success(null);
+ }
+
+ @PutMapping(value = "/cancel/{id}")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(CORONATION_TASK_CANCEL_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Void> cancelCoronationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable("projectCode") long projectCode,
+ @PathVariable("id") long id) {
+ coronationTaskService.cancelCoronationTask(loginUser, projectCode, id);
+ return Result.success(null);
+ }
+
+ @GetMapping("")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(CORONATION_TASK_LISTING_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<PageInfo<CoronationTaskDTO>> listingCoronationTasks(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable("projectCode") long projectCode,
+ @RequestBody CoronationTaskListingRequest request) {
+ return Result.success(coronationTaskService.listingCoronationTasks(loginUser, projectCode, request));
+
+ }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
index f4b445456d..ef9c404113 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
@@ -8,12 +8,12 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.IsolationTaskService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@@ -21,13 +21,11 @@ import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
-import javax.validation.Valid;
-
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR;
@@ -50,17 +48,6 @@ public class IsolationTaskController {
return Result.success(null);
}
- @PutMapping(value = "/online/{id}")
- @ResponseStatus(HttpStatus.OK)
- @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
- @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result<Void> onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @PathVariable long projectCode,
- @PathVariable(name = "id") long isolationId) {
- isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId);
- return Result.success(null);
- }
-
@PutMapping(value = "/cancel/{id}")
@ResponseStatus(HttpStatus.OK)
@ApiException(ISOLATION_TASK_SUBMIT_ERROR)
@@ -76,21 +63,15 @@ public class IsolationTaskController {
@ResponseStatus(HttpStatus.OK)
@ApiException(ISOLATION_TASK_LISTING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result<PageInfo<IsolationTask>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @PathVariable long projectCode,
- @RequestBody @Valid IsolationTaskListingRequest request) {
+ public Result<PageInfo<IsolationTaskListingVO>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable long projectCode,
+ @RequestParam(required = false) String workflowInstanceName,
+ @RequestParam(required = false) String taskName,
+ @RequestParam Integer pageNo,
+ @RequestParam Integer pageSize) {
+ IsolationTaskListingRequest request =
+ new IsolationTaskListingRequest(workflowInstanceName, taskName, pageNo, pageSize);
return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request));
}
- @DeleteMapping("/{id}")
- @ResponseStatus(HttpStatus.OK)
- @ApiException(ISOLATION_TASK_DELETE_ERROR)
- @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result<Void> deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @PathVariable long projectCode,
- @PathVariable long id) {
- isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id);
- return Result.success(null);
- }
-
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
new file mode 100644
index 0000000000..7ecded2c1c
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskListingRequest {
+
+ private String workflowInstanceName;
+
+ private String taskName;
+
+ @NotNull(message = "pageNo cannot be null")
+ private Integer pageNo;
+
+ @NotNull(message = "pageSize cannot be null")
+ private Integer pageSize;
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java
new file mode 100644
index 0000000000..4fa9c384de
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskParseRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskParseRequest {
+
+ private List<CoronationTaskExcelImportVO> coronationTasks;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
new file mode 100644
index 0000000000..2597a1206e
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskSubmitRequest {
+
+ private List<CoronationTaskParseVO> CoronationTasks;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 602cb477dd..71e462bbf9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -550,6 +550,19 @@ public enum Status {
ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"),
ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline",
"删除隔离任务异常,该隔离任务尚未下线"),
+
+ CORONATION_TASK_PARSE_ERROR(1600000, "Coronation task parse error", "解析加冕任务异常"),
+ CORONATION_TASK_NOT_EXIST(1600001, "Coronation task not exist", "加冕任务不存在"),
+ CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED(1600002,
+ "Coronation task parse error, taskNode name is not validated", "解析加冕任务异常, 任务名不正确"),
+ CORONATION_TASK_SUBMIT_ERROR(1600100, "Coronation task submit error", "提交加冕任务异常"),
+ CORONATION_TASK_ONLINE_ERROR(1600200, "Coronation task online error", "上线加冕任务异常"),
+ CORONATION_TASK_CANCEL_ERROR(1600300, "Coronation task cancel error", "取消加冕任务异常"),
+ CORONATION_TASK_LISTING_ERROR(1600400, "Coronation task listing error", "查询加冕任务异常"),
+ CORONATION_TASK_DELETE_ERROR(1600500, "Coronation task delete error", "删除加冕任务异常"),
+
+ PROCESS_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1700000,
+ "The workflow instance doesn't belong to the current project", "工作流实例不属于当前项目"),
;
private final int code;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
index ef3634cd52..a0d82e4632 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
@@ -27,6 +27,11 @@ public class ApiServerRPCClient {
sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS);
}
+ public void send(@NonNull Host host,
+ @NonNull Command rpcCommand) throws RemotingException {
+ client.send(host, rpcCommand);
+ }
+
public void sendSyncCommand(@NonNull Host host,
@NonNull Command rpcCommand,
long timeoutMills) throws RemotingException, InterruptedException {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java
new file mode 100644
index 0000000000..a45a843d4a
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/CoronationTaskService.java
@@ -0,0 +1,30 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface CoronationTaskService {
+
+ List<CoronationTaskParseVO> parseCoronationTask(@NonNull User loginUser,
+ long projectCode,
+ @NonNull CoronationTaskParseRequest request);
+
+ void submitCoronationTask(@NonNull User loginUser,
+ long projectCode,
+ @NonNull CoronationTaskSubmitRequest request);
+
+ PageInfo<CoronationTaskDTO> listingCoronationTasks(@NonNull User loginUser,
+ long projectCode,
+ @NonNull CoronationTaskListingRequest request);
+
+ void cancelCoronationTask(@NonNull User loginUser, long projectCode, long id);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
index 7225d19007..16cd6ec225 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
@@ -5,6 +5,7 @@ import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -14,19 +15,12 @@ public interface IsolationTaskService {
long projectCode,
@NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest);
- void onlineTaskIsolation(@NonNull User loginUser,
- long projectCode,
- long isolationTaskId);
-
void cancelTaskIsolation(@NonNull User loginUser,
long projectCode,
long isolationId);
- PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
- long projectCode,
- @NonNull IsolationTaskListingRequest request);
+ PageInfo<IsolationTaskListingVO> listingTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ @NonNull IsolationTaskListingRequest request);
- void deleteTaskIsolation(@NonNull User loginUser,
- long projectCode,
- long id);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
new file mode 100644
index 0000000000..a6c0691c8b
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java
@@ -0,0 +1,205 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.api.checker.CoronationTaskChecker;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskParseRequest;
+import org.apache.dolphinscheduler.api.dto.request.CoronationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient;
+import org.apache.dolphinscheduler.api.service.CoronationTaskService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO;
+import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.dto.TaskSimpleInfoDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class CoronationTaskServiceImpl implements CoronationTaskService {
+
+ @Autowired
+ private CoronationTaskChecker coronationTaskChecker;
+
+ @Autowired
+ private CoronationTaskDao coronationTaskDao;
+
+ @Autowired
+ private ProcessInstanceDao processInstanceDao;
+
+ @Autowired
+ private ProjectService projectService;
+
+ @Autowired
+ private WorkflowDAGService workflowDAGService;
+
+ @Autowired
+ private RegistryClient registryClient;
+
+ @Autowired
+ private ApiServerRPCClient apiServerRPCClient;
+
+ @Override
+ public List<CoronationTaskParseVO> parseCoronationTask(@NonNull User loginUser,
+ long projectCode,
+ @NonNull CoronationTaskParseRequest request) {
+ List<CoronationTaskExcelImportVO> coronationTasks = request.getCoronationTasks();
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ return coronationTasks.stream()
+ .map(vo -> {
+ ProcessInstance workflowInstance =
+ processInstanceDao.queryProcessInstanceById(vo.getWorkflowInstanceId())
+ .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+ coronationTaskChecker.checkCanParseCoronationTask(loginUser, project, workflowInstance, vo);
+ DAG<String, TaskNode, TaskNodeRelation> workflowDAG =
+ workflowDAGService.getWorkflowDAG(workflowInstance.getProcessDefinitionCode(),
+ workflowInstance.getProcessDefinitionVersion());
+ if (!vo.getTaskName().equals(workflowDAG.getNode(Long.toString(vo.getTaskCode())).getName())) {
+ throw new ServiceException(Status.CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED);
+ }
+ List<TaskSimpleInfoDTO> previousTaskNodeDTO =
+ workflowDAG.getPreviousNodes(Long.toString(vo.getTaskCode()))
+ .stream()
+ .map(previousNodeCode -> {
+ TaskNode node = workflowDAG.getNode(previousNodeCode);
+ return new TaskSimpleInfoDTO(node.getName(), node.getCode());
+ }).collect(Collectors.toList());
+ return CoronationTaskParseVO.builder()
+ .workflowInstanceId(workflowInstance.getId())
+ .workflowInstanceName(workflowInstance.getName())
+ .taskNode(vo.getTaskName())
+ .taskCode(vo.getTaskCode())
+ .upstreamTasks(previousTaskNodeDTO)
+ .build();
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public void submitCoronationTask(@NonNull User loginUser, long projectCode,
+ @NonNull CoronationTaskSubmitRequest request) {
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ Map<Integer, List<CoronationTaskParseVO>> coronationTaskMap = request.getCoronationTasks().stream()
+ .collect(Collectors.groupingBy(CoronationTaskParseVO::getWorkflowInstanceId));
+
+ List<CoronationTask> needToInsertIntoDB = new ArrayList<>();
+
+ for (Map.Entry<Integer, List<CoronationTaskParseVO>> entry : coronationTaskMap.entrySet()) {
+ Integer workflowInstanceId = entry.getKey();
+ List<CoronationTaskParseVO> vos = entry.getValue();
+
+ ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+ .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+ DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(
+ workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion());
+ coronationTaskChecker.checkCanSubmitTaskCoronation(loginUser, project, workflowInstance, workflowDAG, vos);
+
+ List<CoronationTask> coronationTasks = vos.stream()
+ .map(vo -> {
+ Set<String> previousNodes = workflowDAG.getPreviousNodes(vo.getTaskCode().toString());
+ Set<String> selectNodes = vo.getUpstreamTasks()
+ .stream()
+ .map(taskNode -> Long.toString(taskNode.getTaskCode()))
+ .collect(Collectors.toSet());
+ // The upstream node hasn't been selected will be set to forbidden execute
+ List<TaskSimpleInfoDTO> needToForbiddenTaskCodes =
+ CollectionUtils.subtract(previousNodes, selectNodes)
+ .stream()
+ .map(taskNode -> {
+ TaskNode node = workflowDAG.getNode(taskNode);
+ return new TaskSimpleInfoDTO(node.getName(), node.getCode());
+ }).collect(Collectors.toList());
+
+ return CoronationTask.builder()
+ .workflowInstanceId(vo.getWorkflowInstanceId())
+ .workflowInstanceName(vo.getWorkflowInstanceName())
+ .taskName(vo.getTaskNode())
+ .taskCode(vo.getTaskCode())
+ .forbiddenUpstreamTasks(JSONUtils.toJsonString(needToForbiddenTaskCodes))
+ .build();
+ }).collect(Collectors.toList());
+ needToInsertIntoDB.addAll(coronationTasks);
+ }
+ coronationTaskDao.batchInsert(needToInsertIntoDB);
+ sendSyncCoronationTasksRequestToMaster();
+ }
+
+ @Override
+ public void cancelCoronationTask(@NonNull User loginUser, long projectCode, long id) {
+ // do checker
+ CoronationTask coronationTask = coronationTaskDao.queryCoronationTaskById(id)
+ .orElseThrow(() -> new ServiceException(Status.CORONATION_TASK_NOT_EXIST));
+ ProcessInstance workflowInstance =
+ processInstanceDao.queryProcessInstanceById(coronationTask.getWorkflowInstanceId())
+ .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
+ coronationTaskChecker.checkCanCancelTaskCoronation(loginUser, projectCode, workflowInstance, coronationTask);
+ coronationTaskDao.deleteById(id);
+ sendSyncCoronationTasksRequestToMaster();
+ }
+
+ @Override
+ public PageInfo<CoronationTaskDTO> listingCoronationTasks(@NonNull User loginUser, long projectCode,
+ @NonNull CoronationTaskListingRequest request) {
+ coronationTaskChecker.checkCanListingTaskCoronation(loginUser, projectCode);
+
+ Integer pageNo = request.getPageNo();
+ Integer pageSize = request.getPageSize();
+
+ IPage<CoronationTask> iPage = coronationTaskDao.pageQueryCoronationTask(
+ request.getWorkflowInstanceName(),
+ request.getTaskName(),
+ pageNo,
+ pageSize);
+
+ PageInfo<CoronationTaskDTO> pageInfo = new PageInfo<>(pageNo, pageSize);
+ pageInfo.setTotal((int) iPage.getTotal());
+ pageInfo.setTotalList(iPage.getRecords().stream().map(CoronationTaskDTO::new).collect(Collectors.toList()));
+
+ return pageInfo;
+ }
+
+ private void sendSyncCoronationTasksRequestToMaster() {
+ List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+ if (CollectionUtils.isEmpty(masters)) {
+ return;
+ }
+ Command command = new RefreshCoronationMetadataRequest().convert2Command();
+ for (Server master : masters) {
+ try {
+ apiServerRPCClient.send(new Host(master.getHost(), master.getPort()), command);
+ } catch (RemotingException e) {
+ log.error("Send RefreshCoronationTask request to master error, master: {}", master, e);
+ }
+ }
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
index 4f6ff908b1..f3bdcf7600 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
@@ -16,12 +16,12 @@ import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
import org.apache.dolphinscheduler.api.transformer.CommandTransformer;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskListingVO;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationMetadataRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -49,7 +49,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR;
@Slf4j
@Service
@@ -83,7 +82,6 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
private TaskInstanceDao taskInstanceDao;
@Override
- @Transactional
public void submitTaskIsolations(@NonNull User loginUser,
long projectCode,
@NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
@@ -100,7 +98,8 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
for (Map.Entry<Integer, List<IsolationTaskExcelParseVO>> entry : workflow2VoMap.entrySet()) {
Integer workflowInstanceId = entry.getKey();
List<IsolationTaskExcelParseVO> vos = entry.getValue();
- ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+ ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+ .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos);
List<IsolationTask> isolationTasks = entry.getValue().stream().map(vo -> {
@@ -109,36 +108,13 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
.workflowInstanceName(vo.getWorkflowInstanceName())
.taskName(vo.getTaskName())
.taskCode(vo.getTaskCode())
- .status(IsolationTaskStatus.ONLINE.getCode())
.build();
}).collect(Collectors.toList());
needToInsertIntoDB.addAll(isolationTasks);
needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks));
}
isolationTaskDao.batchInsert(needToInsertIntoDB);
- // we split here to avoid rollback RPC request
- try {
- refreshIsolationTasks();
- } catch (Exception ex) {
- throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR);
- }
- }
-
- @Override
- @Transactional
- public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) {
- IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId)
- .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
- ProcessInstance processInstance =
- processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId());
-
- isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
- isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE);
- try {
- refreshIsolationTasks();
- } catch (Exception ex) {
- throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR);
- }
+ sendIsolationTaskRefreshRequestToMaster();
}
@Override
@@ -150,21 +126,18 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
.orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
Integer workflowInstanceId = isolationTask.getWorkflowInstanceId();
- ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+ ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+ .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
- isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE);
- insertRecoveryCommandIfNeeded(processInstance, isolationTask);
- try {
- refreshIsolationTasks();
- } catch (RemotingException | InterruptedException e) {
- throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR);
- }
+ isolationTaskDao.deleteById(isolationTask.getId());
+ insertRecoveryCommandIfNeeded(processInstance);
+ sendIsolationTaskRefreshRequestToMaster();
}
@Override
- public PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
- long projectCode,
- @NonNull IsolationTaskListingRequest request) {
+ public PageInfo<IsolationTaskListingVO> listingTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ @NonNull IsolationTaskListingRequest request) {
isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode);
Integer pageNo = request.getPageNo();
@@ -175,46 +148,48 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
request.getTaskName(),
pageNo,
pageSize);
-
- PageInfo<IsolationTask> pageInfo = new PageInfo<>(pageNo, pageSize);
+ List<IsolationTaskListingVO> isolationTaskListingVOList = iPage.getRecords()
+ .stream()
+ .map(isolationTask -> {
+ return IsolationTaskListingVO.builder()
+ .id(isolationTask.getId())
+ .workflowInstanceId(isolationTask.getWorkflowInstanceId())
+ .workflowInstanceName(isolationTask.getWorkflowInstanceName())
+ // todo: set task status
+ .taskName(isolationTask.getTaskName())
+ .taskCode(isolationTask.getTaskCode())
+ .createTime(isolationTask.getCreateTime())
+ .updateTime(isolationTask.getUpdateTime())
+ .build();
+ }).collect(Collectors.toList());
+ PageInfo<IsolationTaskListingVO> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) iPage.getTotal());
- pageInfo.setTotalList(iPage.getRecords());
+ pageInfo.setTotalList(isolationTaskListingVOList);
return pageInfo;
}
- @Override
- public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) {
- isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id);
- int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE);
- if (deleteNum <= 0) {
- throw new ServiceException(ISOLATION_TASK_NOT_EXIST);
- }
- }
-
- private void refreshIsolationTasks() throws RemotingException, InterruptedException {
+ private void sendIsolationTaskRefreshRequestToMaster() {
List<Server> masters = registryClient.getServerList(NodeType.MASTER);
if (CollectionUtils.isEmpty(masters)) {
return;
}
org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest =
- new RefreshIsolationTaskRequest().convert2Command();
+ new RefreshIsolationMetadataRequest().convert2Command();
for (Server master : masters) {
try {
- apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
- refreshIsolationRequest);
- } catch (RemotingException | InterruptedException e) {
+ apiServerRPCClient.send(new Host(master.getHost(), master.getPort()), refreshIsolationRequest);
+ } catch (RemotingException e) {
log.error("Send RefreshIsolationTask request to master error, master: {}", master, e);
- throw e;
}
}
}
- private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance,
- @NonNull IsolationTask isolationTask) {
+ private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance) {
if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) {
return;
}
+ log.info("The current workflow instance is in PAUSE_BY_ISOLATION status, will insert a recovery command");
int workflowInstanceId = processInstance.getId();
// find the isolationTaskInstanceIds need to recovery
// find the sub node is in pause or kill
@@ -225,7 +200,7 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
List<TaskInstance> taskInstances =
taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId);
Set<String> onlineIsolationTaskCodes =
- isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE)
+ isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId)
.stream()
.map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode()))
.collect(Collectors.toSet());
@@ -236,6 +211,7 @@ public class IsolationTaskServiceImpl implements IsolationTaskService {
onlineIsolationTaskCodes, workflowDAG))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) {
+ log.error("The current workflow instance has no task instance can recovery");
return;
}
// find if this taskInstance still exist pre isolationTasks
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 9d1dc166a2..6c5b49b94d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -242,7 +242,8 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
.entrySet()) {
Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey();
List<Integer> needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue();
- ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+ ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId)
+ .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST));
workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance);
Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance,
needToCleanStateTaskInstanceIds);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java
new file mode 100644
index 0000000000..dc21207513
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskExcelImportVO.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskExcelImportVO {
+
+ @NotNull(message = "Workflow instance id cannot be null")
+ private Integer workflowInstanceId;
+
+ @NotNull(message = "Workflow instance name cannot be null")
+ private String workflowInstanceName;
+
+ @NotNull(message = "Task code cannot be null")
+ private Long taskCode;
+
+ @NotNull(message = "Task name cannot be null")
+ private String taskName;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java
new file mode 100644
index 0000000000..e52379b2cd
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/CoronationTaskParseVO.java
@@ -0,0 +1,34 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.dao.dto.TaskSimpleInfoDTO;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskParseVO {
+
+ @NotNull(message = "workflow instance cannot be null")
+ private String workflowInstanceName;
+
+ @NotNull(message = "workflow instance id cannot be null")
+ private Integer workflowInstanceId;
+
+ @NotNull(message = "taskNode cannot be null")
+ private String taskNode;
+
+ @NotNull(message = "taskCode cannot be null")
+ private Long taskCode;
+
+ @NotEmpty(message = "upstream task nodes cannot be null")
+ private List<TaskSimpleInfoDTO> upstreamTasks;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
similarity index 54%
copy from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
index 8a852aace5..94b69cc9fb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskListingVO.java
@@ -1,8 +1,5 @@
-package org.apache.dolphinscheduler.dao.entity;
+package org.apache.dolphinscheduler.api.vo;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -14,10 +11,8 @@ import java.util.Date;
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@TableName("t_ds_isolation_task")
-public class IsolationTask {
+public class IsolationTaskListingVO {
- @TableId(value = "id", type = IdType.AUTO)
private Long id;
private int workflowInstanceId;
@@ -28,10 +23,9 @@ public class IsolationTask {
private long taskCode;
- private int status;
+ private String taskStatus;
private Date createTime;
private Date updateTime;
-
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index efffbc3842..a851b6dc5d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -341,6 +341,9 @@ public final class Constants {
public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds";
public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds";
+ public static final String CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS =
+ "RecoveryPausedByCoronationTaskInstanceIds";
+
public static final String CMD_PARAM_START_PARAMS = "StartParams";
public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 62d538a58b..ca30317808 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -57,6 +57,8 @@ public enum CommandType {
RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"),
+ RECOVERY_FROM_CORONATION_PAUSE_TASKS(14, "recovery from coronation pause tasks"),
+
;
CommandType(int code, String descp) {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index d31c92780c..255c4f833f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -72,6 +72,7 @@ public class JSONUtils {
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.setTimeZone(TimeZone.getDefault())
.setDateFormat(new SimpleDateFormat(Constants.YYYY_MM_DD_HH_MM_SS));
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java
new file mode 100644
index 0000000000..df22a2fbcf
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/CoronationTaskDTO.java
@@ -0,0 +1,50 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+
+import java.util.Date;
+import java.util.List;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoronationTaskDTO {
+
+ private Long id;
+
+ private int workflowInstanceId;
+
+ private String workflowInstanceName;
+
+ private String taskName;
+
+ private long taskCode;
+
+ private List<TaskSimpleInfoDTO> forbiddenUpstreamTasks;
+
+ private Date createTime;
+
+ private Date updateTime;
+
+ public CoronationTaskDTO(@NonNull CoronationTask coronationTask) {
+ this.id = coronationTask.getId();
+ this.workflowInstanceId = coronationTask.getWorkflowInstanceId();
+ this.workflowInstanceName = coronationTask.getWorkflowInstanceName();
+ this.taskName = coronationTask.getTaskName();
+ this.taskCode = coronationTask.getTaskCode();
+ this.forbiddenUpstreamTasks = JSONUtils.parseObject(coronationTask.getForbiddenUpstreamTasks(),
+ new TypeReference<List<TaskSimpleInfoDTO>>() {
+ });
+ this.createTime = coronationTask.getCreateTime();
+ this.updateTime = coronationTask.getUpdateTime();
+ }
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
deleted file mode 100644
index dcc93cc64b..0000000000
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.dolphinscheduler.dao.dto;
-
-public enum IsolationTaskStatus {
-
- ONLINE(0),
- OFFLINE(1),
- ;
-
- private final int code;
-
- IsolationTaskStatus(int code) {
- this.code = code;
- }
-
- public int getCode() {
- return code;
- }
-
- public static IsolationTaskStatus of(int code) {
- for (IsolationTaskStatus value : IsolationTaskStatus.values()) {
- if (value.getCode() == code) {
- return value;
- }
- }
- throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code));
- }
-}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java
new file mode 100644
index 0000000000..ea454ce543
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskRuntimeContext.java
@@ -0,0 +1,20 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class TaskRuntimeContext {
+
+ private boolean hasBeenIsolated;
+ private boolean isIsolationTask;
+
+ private boolean hasBeenCoronatted;
+ private boolean isCoronationTask;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
new file mode 100644
index 0000000000..9fe05ca9ca
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskSimpleInfoDTO {
+
+ private String taskName;
+ private long taskCode;
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
similarity index 86%
copy from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
copy to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
index 8a852aace5..a7b72203bb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/CoronationTask.java
@@ -14,8 +14,8 @@ import java.util.Date;
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@TableName("t_ds_isolation_task")
-public class IsolationTask {
+@TableName("t_ds_coronation_task")
+public class CoronationTask {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
@@ -28,7 +28,7 @@ public class IsolationTask {
private long taskCode;
- private int status;
+ private String forbiddenUpstreamTasks;
private Date createTime;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
index 8a852aace5..a7aaf4b359 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
@@ -28,8 +28,6 @@ public class IsolationTask {
private long taskCode;
- private int status;
-
private Date createTime;
private Date updateTime;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 3e1d8cef0f..23ad1de3f0 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -276,6 +276,8 @@ public class TaskInstance implements Serializable {
*/
private int taskGroupId;
+ private String runtimeContext;
+
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
@@ -758,4 +760,12 @@ public class TaskInstance implements Serializable {
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
+
+ public String getRuntimeContext() {
+ return runtimeContext;
+ }
+
+ public void setRuntimeContext(String runtimeContext) {
+ this.runtimeContext = runtimeContext;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index c957260a3c..fd1c8d7204 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -57,4 +57,7 @@ public interface CommandMapper extends BaseMapper<Command> {
@Param("thisMasterSlot") int thisMasterSlot);
void batchInsertCommand(List<Command> commands);
+
+ List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(@Param("workflowInstanceId") long workflowInstanceId,
+ @Param("command_type") int commandType);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
new file mode 100644
index 0000000000..d5829de216
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.dao.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface CoronationTaskMapper extends BaseMapper<CoronationTask> {
+
+ IPage<CoronationTask> pageQueryCoronationTask(Page<IsolationTask> page,
+ @Param("workflowInstanceName") String workflowInstanceName,
+ @Param("taskName") String taskName);
+
+ void batchInsert(@Param("coronationTasks") List<CoronationTask> coronationTasks);
+
+ List<CoronationTask> queryAllCoronationTasks();
+
+ int queryAllCoronationTaskNumber();
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
index babb00c5bb..3bd1045682 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -10,23 +10,17 @@ import java.util.List;
public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
- IPage<IsolationTask> pageQuery(@Param("workflowInstanceName") String workflowInstanceName,
- @Param("taskName") String taskName,
- IPage<IsolationTask> page);
+ IPage<IsolationTask> pageQuery(IPage<IsolationTask> page,
+ @Param("workflowInstanceName") String workflowInstanceName,
+ @Param("taskName") String taskName);
List<IsolationTask> queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId,
@NonNull @Param("taskCodes") List<Long> taskCodes);
- void updateIsolationTaskStatus(@Param("id") long isolationTaskId,
- @Param("status") int status);
-
- int deleteByIdAndStatus(@Param("id") long isolationTaskId,
- @Param("status") int status);
-
- List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId,
- @Param("status") int status);
+ List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
void batchInsert(@Param("isolationTasks") List<IsolationTask> isolationTasks);
- List<IsolationTask> queryByStatus(@Param("status") int code);
+ List<IsolationTask> queryAllIsolationTask();
+
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index a2ec345fa0..cf97f42712 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -258,4 +258,6 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("state") int state, @Param("id") int id);
+
+ List<ProcessInstance> queryByStatus(@Param("state") int code);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index cf282e1359..23a47177e9 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -39,6 +39,10 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
List<TaskInstance> findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId,
@Param("flag") Flag flag);
+ List<TaskInstance> findValidTaskListByProcessIdAndTaskStatus(@Param("processInstanceId") Integer processInstanceId,
+ @Param("status") int status,
+ @Param("flag") int flag);
+
List<TaskInstance> queryByHostAndStatus(@Param("host") String host,
@Param("states") int[] stateArray);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
index 278b922082..17445b8e6f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
@@ -7,4 +7,6 @@ import java.util.List;
public interface CommandDao {
void batchInsertCommand(List<Command> commands);
+
+ List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
new file mode 100644
index 0000000000..1893e8ffd6
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java
@@ -0,0 +1,28 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface CoronationTaskDao {
+
+ IPage<CoronationTask> pageQueryCoronationTask(String workflowInstanceName,
+ String taskName,
+ @NonNull Integer pageNo,
+ @NonNull Integer pageSize);
+
+ List<CoronationTaskDTO> queryAllCoronationTasks();
+
+ Optional<CoronationTask> queryCoronationTaskById(long id);
+
+ int deleteById(long id);
+
+ void batchInsert(List<CoronationTask> coronationTasks);
+
+ int queryAllCoronationTaskNumber();
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
index de16a92be0..b38b30569a 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -2,12 +2,10 @@ package org.apache.dolphinscheduler.dao.repository;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.NonNull;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
public interface IsolationTaskDao {
@@ -19,21 +17,17 @@ public interface IsolationTaskDao {
int pageNumber,
int pageSize);
+ List<IsolationTask> queryAllIsolationTask();
+
List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes);
- List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus);
+ List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId);
Optional<IsolationTask> queryById(long isolationTaskId);
- List<IsolationTask> queryByIds(List<Long> isolationTaskIds);
-
- List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus);
-
- int deleteByIdAndStatus(long id, IsolationTaskStatus status);
+ int deleteById(long id);
void insert(IsolationTask isolationTaskDTO);
- void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus);
-
void batchInsert(List<IsolationTask> isolationTasks);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
index bd6e140951..7cde663fa7 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
@@ -3,7 +3,9 @@ package org.apache.dolphinscheduler.dao.repository;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import java.util.Optional;
+
public interface ProcessDefinitionDao {
- ProcessDefinition queryProcessDefinitionByCode(@NonNull Long processDefinitionCode);
+ Optional<ProcessDefinition> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index e789101783..a069340c96 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -19,14 +19,16 @@ package org.apache.dolphinscheduler.dao.repository;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.List;
+import java.util.Optional;
public interface ProcessInstanceDao {
List<ProcessInstance> queryProcessInstanceByIds(List<Integer> processInstanceIds);
- ProcessInstance queryProcessInstanceById(@NonNull Integer processInstanceId);
+ Optional<ProcessInstance> queryProcessInstanceById(@NonNull Integer processInstanceId);
int insertProcessInstance(ProcessInstance processInstance);
@@ -39,4 +41,5 @@ public interface ProcessInstanceDao {
*/
int upsertProcessInstance(ProcessInstance processInstance);
+ List<ProcessInstance> queryProcessInstanceByStatus(@NonNull ExecutionStatus pauseByIsolation);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index 3918ad8386..971258499d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.exception.RepositoryException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.List;
@@ -12,6 +13,9 @@ public interface TaskInstanceDao {
List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId);
+ List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(@NonNull Integer workflowInstanceId,
+ @NonNull ExecutionStatus status);
+
/**
* Update the taskInstance, if update failed will throw exception.
*
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
index 79297445d1..2365b5f2bb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
@@ -1,6 +1,7 @@
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
@@ -22,4 +23,10 @@ public class CommandDaoImpl implements CommandDao {
}
commandMapper.batchInsertCommand(commands);
}
+
+ @Override
+ public List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId) {
+ return commandMapper.queryRecoveryCoronationCommandByWorkflowInstanceId(workflowInstanceId,
+ CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS.getCode());
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
new file mode 100644
index 0000000000..893b28fbbb
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java
@@ -0,0 +1,65 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.CoronationTask;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.mapper.CoronationTaskMapper;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Repository
+public class CoronationTaskDaoImpl implements CoronationTaskDao {
+
+ @Autowired
+ private CoronationTaskMapper coronationTaskMapper;
+
+ @Override
+ public IPage<CoronationTask> pageQueryCoronationTask(String workflowInstanceName, String taskName,
+ @NonNull Integer pageNo,
+ @NonNull Integer pageSize) {
+ Page<IsolationTask> page = new Page<>(pageNo, pageSize);
+ return coronationTaskMapper.pageQueryCoronationTask(page, workflowInstanceName, taskName);
+ }
+
+ @Override
+ public List<CoronationTaskDTO> queryAllCoronationTasks() {
+ return coronationTaskMapper.queryAllCoronationTasks()
+ .stream()
+ .map(CoronationTaskDTO::new)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Optional<CoronationTask> queryCoronationTaskById(long id) {
+ return Optional.ofNullable(coronationTaskMapper.selectById(id));
+ }
+
+ @Override
+ public int deleteById(long id) {
+ return coronationTaskMapper.deleteById(id);
+ }
+
+ @Override
+ public void batchInsert(List<CoronationTask> coronationTasks) {
+ if (CollectionUtils.isEmpty(coronationTasks)) {
+ return;
+ }
+ coronationTaskMapper.batchInsert(coronationTasks);
+ }
+
+ @Override
+ public int queryAllCoronationTaskNumber() {
+ return coronationTaskMapper.queryAllCoronationTaskNumber();
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
index effab3aa16..0608adba0b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
@@ -29,7 +28,12 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
int pageNumber,
int pageSize) {
Page<IsolationTask> page = new Page<>(pageNumber, pageSize);
- return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page);
+ return isolationTaskMapper.pageQuery(page, workflowInstanceName, taskName);
+ }
+
+ @Override
+ public List<IsolationTask> queryAllIsolationTask() {
+ return isolationTaskMapper.queryAllIsolationTask();
}
@Override
@@ -41,9 +45,8 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
}
@Override
- public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId,
- IsolationTaskStatus isolationTaskStatus) {
- return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode());
+ public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId) {
+ return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId);
}
@Override
@@ -52,18 +55,8 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
}
@Override
- public List<IsolationTask> queryByIds(List<Long> isolationTaskIds) {
- return isolationTaskMapper.selectBatchIds(isolationTaskIds);
- }
-
- @Override
- public List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) {
- return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode());
- }
-
- @Override
- public int deleteByIdAndStatus(long id, IsolationTaskStatus status) {
- return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode());
+ public int deleteById(long id) {
+ return isolationTaskMapper.deleteById(id);
}
@Override
@@ -71,11 +64,6 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao {
isolationTaskMapper.insert(isolationTask);
}
- @Override
- public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) {
- isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode());
- }
-
@Override
public void batchInsert(List<IsolationTask> isolationTasks) {
if (CollectionUtils.isEmpty(isolationTasks)) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
index 72dce0b2c3..637b10ad5d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
@@ -7,6 +7,8 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
+import java.util.Optional;
+
@Repository
public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
@@ -14,7 +16,7 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
private ProcessDefinitionMapper processDefinitionMapper;
@Override
- public ProcessDefinition queryProcessDefinitionByCode(@NonNull Long processDefinitionCode) {
- return processDefinitionMapper.queryByCode(processDefinitionCode);
+ public Optional<ProcessDefinition> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode) {
+ return Optional.ofNullable(processDefinitionMapper.queryByCode(processDefinitionCode));
}
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 84ab280c7f..338566e67e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -23,11 +23,13 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
@Slf4j
@Repository
@@ -45,8 +47,8 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
}
@Override
- public ProcessInstance queryProcessInstanceById(@NonNull Integer processInstanceId) {
- return processInstanceMapper.selectById(processInstanceId);
+ public Optional<ProcessInstance> queryProcessInstanceById(@NonNull Integer processInstanceId) {
+ return Optional.ofNullable(processInstanceMapper.selectById(processInstanceId));
}
@Override
@@ -67,4 +69,9 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
return insertProcessInstance(processInstance);
}
}
+
+ @Override
+ public List<ProcessInstance> queryProcessInstanceByStatus(@NonNull ExecutionStatus executionStatus) {
+ return processInstanceMapper.queryByStatus(executionStatus.getCode());
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 0512980b10..9b07088810 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -8,6 +8,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.exception.RepositoryException;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@@ -34,6 +35,13 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES);
}
+ @Override
+ public List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(@NonNull Integer workflowInstanceId,
+ @NonNull ExecutionStatus status) {
+ return taskInstanceMapper.findValidTaskListByProcessIdAndTaskStatus(workflowInstanceId, status.getCode(),
+ Flag.YES.getCode());
+ }
+
@Override
public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException {
try {
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
new file mode 100644
index 0000000000..0967fa71ec
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.CoronationTaskMapper">
+ <sql id="baseSql">
+ id
+ ,workflow_instance_id, workflow_instance_name, task_name, task_code, forbidden_upstream_tasks, create_time, update_time
+ </sql>
+
+ <select id="pageQueryCoronationTask" resultType="org.apache.dolphinscheduler.dao.entity.CoronationTask">
+ select
+ <include refid="baseSql"/>
+ from t_ds_coronation_task
+ where 1 = 1
+ <if test="workflowInstanceName != null and workflowInstanceName != ''">
+ and workflow_instance_name = #{workflowInstanceName}
+ </if>
+ <if test="taskName != null and taskName != ''">
+ and task_name = #{taskName}
+ </if>
+ </select>
+
+ <select id="queryAllCoronationTasks" resultType="org.apache.dolphinscheduler.dao.entity.CoronationTask">
+ select
+ <include refid="baseSql"/>
+ from t_ds_coronation_task
+ </select>
+
+ <select id="queryAllCoronationTaskNumber" resultType="java.lang.Integer">
+ select count(1) from t_ds_coronation_task
+ </select>
+
+ <insert id="batchInsert">
+ insert into t_ds_coronation_task (
+ workflow_instance_id,
+ workflow_instance_name,
+ task_name,
+ task_code,
+ forbidden_upstream_tasks
+ )
+ values
+ <foreach collection="coronationTasks" item="coronationTask" separator=",">
+ (
+ #{coronationTask.workflowInstanceId},
+ #{coronationTask.workflowInstanceName},
+ #{coronationTask.taskName},
+ #{coronationTask.taskCode},
+ #{coronationTask.forbiddenUpstreamTasks}
+ )
+ </foreach>
+ </insert>
+
+</mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
index 4a7100010e..390591be26 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -20,18 +20,18 @@
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper">
<sql id="baseSql">
id
- ,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time
+ ,workflow_instance_id, workflow_instance_name, task_name, task_code, create_time, update_time
</sql>
- <select id="pageQuery" resultType="string">
+ <select id="pageQuery" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
select
<include refid="baseSql"/>
from t_ds_isolation_task
where 1 = 1
- <if test="workflowInstanceName != null">
+ <if test="workflowInstanceName != null and workflowInstanceName != ''">
and workflow_instance_name = #{workflowInstanceName}
</if>
- <if test="taskName != null">
+ <if test="taskName != null and taskName != ''">
and task_name = #{taskName}
</if>
</select>
@@ -51,29 +51,21 @@
select
<include refid="baseSql"/>
from t_ds_isolation_task
- where workflow_instance_id = #{workflowInstanceId} and status = #{status}
+ where workflow_instance_id = #{workflowInstanceId}
</select>
- <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+ <select id="queryAllIsolationTask" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
select
<include refid="baseSql"/>
from t_ds_isolation_task
- where status = #{status}
</select>
- <update id="updateIsolationTaskStatus">
- update t_ds_isolation_task
- set status = #{status}
- where id = #{id}
- </update>
-
<insert id="batchInsert">
insert into t_ds_isolation_task (
workflow_instance_id,
workflow_instance_name,
task_name,
- task_code,
- status
+ task_code
)
values
<foreach collection="isolationTasks" item="isolationTask" separator=",">
@@ -81,17 +73,9 @@
#{isolationTask.workflowInstanceId},
#{isolationTask.workflowInstanceName},
#{isolationTask.taskName},
- #{isolationTask.taskCode},
- #{isolationTask.status}
+ #{isolationTask.taskCode}
)
</foreach>
</insert>
- <delete id="deleteByIdAndStatus">
- delete
- from t_ds_isolation_task
- where id = #{id}
- and status = #{status}
- </delete>
-
</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 336b966487..7c63a84e2c 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,7 +22,7 @@
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
- first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id
+ first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, runtime_context
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 8f6df6738c..134a260a45 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -864,6 +864,7 @@ CREATE TABLE t_ds_task_instance
task_group_id int(11) DEFAULT NULL,
var_pool longtext,
dry_run int NULL DEFAULT 0,
+ runtime_context text,
PRIMARY KEY (id),
FOREIGN KEY (process_instance_id) REFERENCES t_ds_process_instance (id) ON DELETE CASCADE
);
@@ -1989,8 +1990,24 @@ CREATE TABLE `t_ds_isolation_task`
`workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
`task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
`task_code` bigint NOT NULL COMMENT 'isolation task code',
- `status` tinyint(4) DEFAULT '0',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
);
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_coronation_task`;
+CREATE TABLE `t_ds_coronation_task`
+(
+ `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+ `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+ `task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
+ `task_code` bigint NOT NULL COMMENT 'isolation task code',
+ `forbidden_upstream_tasks` text COMMENT 'The task in forbidden_upstream_tasks will be in forbidden execute list',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+ `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`)
+);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index e5b69db827..d202084ec2 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -344,7 +344,8 @@ CREATE TABLE `t_ds_command` (
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
PRIMARY KEY (`id`),
- KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
+ KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE,
+ KEY `workflow_instance_id_index` (`process_instance_id`,`command_type`) USING BTREE,
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
@@ -859,6 +860,7 @@ CREATE TABLE `t_ds_task_instance` (
`var_pool` longtext COMMENT 'var_pool',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
+ `runtime_context` text COMMENT 'used to store some information used in runtime',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
@@ -1956,9 +1958,26 @@ CREATE TABLE `t_ds_isolation_task`
`workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
`task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
`task_code` bigint NOT NULL COMMENT 'isolation task code',
- `status` tinyint(4) DEFAULT '0',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
- KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE
+ UNIQUE KEY `workflow_instance_task__unique` (`workflow_instance_id`,`task_code`)
+) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_coronation_task`;
+CREATE TABLE `t_ds_coronation_task`
+(
+ `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+ `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+ `task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
+ `task_code` bigint NOT NULL COMMENT 'isolation task code',
+ `forbidden_upstream_tasks` text COMMENT 'The task in forbidden_upstream_tasks will be in forbidden execute list',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+ `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `workflow_instance_task__unique` (`workflow_instance_id`,`task_code`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
similarity index 52%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
index 826a456fd8..1a88860490 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java
@@ -1,26 +1,26 @@
-package org.apache.dolphinscheduler.server.master.processor.isolation;
+package org.apache.dolphinscheduler.server.master.processor.coronation;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+public class RefreshCoronationMetadataProcessor implements NettyRequestProcessor {
@Autowired
- private IsolationTaskManager isolationTaskManager;
+ private CoronationMetadataManager coronationMetadataManager;
@Override
public void process(Channel channel, Command command) {
- if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
- throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
+ if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
+ throw new IllegalArgumentException(String.format("The current rpc command : %s is invalidated", command));
}
- isolationTaskManager.refreshIsolationTaskMapFromDB();
+ coronationMetadataManager.refreshCoronationTaskMetadata();
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
similarity index 71%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
index 826a456fd8..19dd54211e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationMetadataProcessor.java
@@ -5,22 +5,22 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+public class RefreshIsolationMetadataProcessor implements NettyRequestProcessor {
@Autowired
- private IsolationTaskManager isolationTaskManager;
+ private IsolationMetadataManager isolationTaskManager;
@Override
public void process(Channel channel, Command command) {
- if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
+ if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) {
throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
}
- isolationTaskManager.refreshIsolationTaskMapFromDB();
+ isolationTaskManager.refreshIsolationTaskMetadata();
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
similarity index 91%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
index ef3634cd52..3522ac4fec 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.api.remote;
+package org.apache.dolphinscheduler.server.master.rpc;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -11,13 +11,13 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class ApiServerRPCClient {
+public class MasterRPCClient {
private final NettyRemotingClient client;
private static final long DEFAULT_TIME_OUT_MILLS = 1_000L;
- public ApiServerRPCClient() {
+ public MasterRPCClient() {
client = new NettyRemotingClient(new NettyClientConfig());
log.info("Success initialized ApiServerRPCClient...");
}
@@ -32,4 +32,5 @@ public class ApiServerRPCClient {
long timeoutMills) throws RemotingException, InterruptedException {
client.sendSync(host, rpcCommand, timeoutMills);
}
+
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index eb4d82cd3d..4644611a7f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -31,9 +30,8 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningPro
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
-import javax.annotation.PostConstruct;
-
-import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor;
+import org.apache.dolphinscheduler.server.master.processor.coronation.RefreshCoronationMetadataProcessor;
+import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationMetadataProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -76,8 +74,10 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
+ private RefreshCoronationMetadataProcessor refreshCoronationMetadataProcessor;
+
@Autowired
- private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor;
+ private RefreshIsolationMetadataProcessor refreshIsolationTaskProcessor;
public void start() {
logger.info("Starting Master RPC Server...");
@@ -101,8 +101,10 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST,
+ this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_METADATA_REQUEST,
refreshIsolationTaskProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_CORONATION_METADATA_REQUEST,
+ refreshCoronationMetadataProcessor);
this.nettyRemotingServer.start();
logger.info("Started Master RPC Server...");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 6446248c06..a72b39612d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -22,7 +22,8 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,10 @@ public class FailoverExecuteThread extends BaseDaemonThread {
private MasterFailoverService masterFailoverService;
@Autowired
- private IsolationTaskManager isolationTaskManager;
+ private IsolationMetadataManager isolationMetadataManager;
+
+ @Autowired
+ protected CoronationMetadataManager coronationMetadataManager;
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
@@ -70,7 +74,8 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
masterFailoverService.checkMasterFailover();
- isolationTaskManager.refreshIsolationTaskMapFromDB();
+ isolationMetadataManager.refreshIsolationTaskMetadata();
+ coronationMetadataManager.refreshCoronationTaskMetadata();
} catch (Exception e) {
logger.error("Master failover thread execute error", e);
} finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 1d823a71a7..45427fb402 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -46,6 +45,8 @@ import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -110,7 +111,10 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
private StateEventCallbackService stateEventCallbackService;
@Autowired
- private IsolationTaskDao isolationTaskDao;
+ private CoronationMetadataManager coronationMetadataManager;
+
+ @Autowired
+ private IsolationMetadataManager isolationMetadataManager;
private String masterAddress;
@@ -189,7 +193,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService,
- isolationTaskDao);
+ isolationMetadataManager,
+ coronationMetadataManager);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 24d5d31451..b284c80ef2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -37,10 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.dto.TaskRuntimeContext;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
-import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -49,7 +49,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -68,6 +67,8 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
import org.apache.dolphinscheduler.server.master.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -191,8 +192,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final Set<String> stateCleanTaskCodes = new HashSet<>();
- // isolationTaskCode -> isolatedTimes
- private final Map<Long, Integer> isolationTaskCodesToTimesMap = new HashMap<>();
+ // TaskCode -> isolatedTimes
+ // this map store the task which will be isolated, and the times it has been isolated.
+ private final Map<Long, Integer> isolatedTaskCodesToTimesMap = new HashMap<>();
+ // TaskCode -> coronattedTimes
+ private final Map<Long, Integer> coronattedTaskCodeToTimesMap = new HashMap<>();
/**
* state event queue
@@ -214,7 +218,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final CuringParamsService curingParamsService;
- private final IsolationTaskDao isolationTaskDao;
+ private final IsolationMetadataManager isolationMetadataManager;
+ private final CoronationMetadataManager coronationMetadataManager;
private final String masterAddress;
@@ -236,7 +241,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
@NonNull MasterConfig masterConfig,
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService,
- @NonNull IsolationTaskDao isolationTaskDao) {
+ @NonNull IsolationMetadataManager isolationMetadataManager,
+ @NonNull CoronationMetadataManager coronationMetadataManager) {
this.processService = processService;
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
@@ -244,8 +250,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
- this.isolationTaskDao = isolationTaskDao;
+ this.isolationMetadataManager = isolationMetadataManager;
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
+ this.coronationMetadataManager = coronationMetadataManager;
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
@@ -393,6 +400,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
+ if (coronationMetadataManager.isInCoronationMode()
+ && coronattedTaskCodeToTimesMap.containsKey(taskInstance.getTaskCode())) {
+ // The current task is coronation task and is success
+ // we need to delete the coronation metadata in db, and send request to all master
+ coronationMetadataManager.deleteCoronationMetadataInDB(taskInstance.getTaskCode());
+ coronattedTaskCodeToTimesMap.remove(taskInstance.getTaskCode());
+ }
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
@@ -453,31 +467,35 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
- public void onlineTaskIsolation(long taskCode) {
+ public void addTaskIsolation(long taskCode) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
// find the post running task
- Set<String> needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
- // if the current task is finished, kill the post task
- // we need to submit an online isolation task event, otherwise there may exist concurrent problem
- for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) {
- Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr);
- isolationTaskCodesToTimesMap.put(isolationTaskCode,
- isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1);
- ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
- if (iTaskProcessor == null) {
- logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
- isolationTaskCode);
- continue;
- }
- iTaskProcessor.action(TaskAction.ISOLATE);
- StateEvent stateEvent = new StateEvent();
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- stateEvent.setProcessInstanceId(this.processInstance.getId());
- stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
- stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
- this.addStateEvent(stateEvent);
- }
+ DagHelper.getAllPostNodes(Long.toString(taskCode), dag)
+ .stream()
+ .map(Long::parseLong)
+ .forEach(isolatedTaskCode -> {
+ // if the current task is finished, kill the post task
+ // we need to submit an online isolation task event, otherwise there may exist concurrent
+ // problem
+ isolatedTaskCodesToTimesMap.put(isolatedTaskCode,
+ isolatedTaskCodesToTimesMap.getOrDefault(isolatedTaskCode, 0) + 1);
+ ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolatedTaskCode);
+ if (iTaskProcessor == null) {
+ logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
+ isolatedTaskCode);
+ return;
+ }
+ iTaskProcessor.action(TaskAction.ISOLATE);
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ stateEvent.setProcessInstanceId(this.processInstance.getId());
+ stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
+ stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
+ this.addStateEvent(stateEvent);
+
+ });
+
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -486,12 +504,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public void cancelTaskIsolation(long taskCode) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
- // todo:
// restart the killed/ paused task
- Set<String> needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
- for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) {
- Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr);
- Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode);
+ Set<String> needToCancelIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+ for (String needToCancelIsolationTaskCodeStr : needToCancelIsolationTaskCodes) {
+ Long isolationTaskCode = Long.valueOf(needToCancelIsolationTaskCodeStr);
+ Integer isolateTimes = isolatedTaskCodesToTimesMap.get(isolationTaskCode);
if (isolateTimes == null) {
logger.warn(
"The current task has not been isolated, so it don't need to offline isolation, taskCode: {}",
@@ -499,7 +516,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
continue;
}
if (isolateTimes == 1) {
- isolationTaskCodesToTimesMap.remove(isolationTaskCode);
+ isolatedTaskCodesToTimesMap.remove(isolationTaskCode);
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
if (iTaskProcessor == null) {
// the current task has not been submitted
@@ -521,7 +538,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
continue;
}
} else {
- isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
+ isolatedTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
}
}
@@ -899,15 +916,28 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
- List<IsolationTask> isolationTasks =
- isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE);
- for (IsolationTask isolationTask : isolationTasks) {
- Set<String> allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag);
- allPostNodes.forEach(postNode -> {
- isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(),
- isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1);
- });
- }
+ isolationMetadataManager.queryIsolationTasksByWorkflowInstanceId(processInstance.getId())
+ .forEach(isolationTask -> {
+ DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag)
+ .stream()
+ .map(Long::parseLong)
+ .forEach(taskCode -> {
+ isolatedTaskCodesToTimesMap.put(
+ taskCode,
+ isolatedTaskCodesToTimesMap.getOrDefault(taskCode, 0) + 1);
+ });
+ });
+ coronationMetadataManager.getCoronationTasksByWorkflowInstanceId(processInstance.getId())
+ .forEach(coronationTask -> {
+ DagHelper.getAllPreNodes(Long.toString(coronationTask.getTaskCode()), dag)
+ .stream()
+ .map(Long::parseLong)
+ .forEach(taskCode -> {
+ coronattedTaskCodeToTimesMap.put(
+ taskCode,
+ coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
+ });
+ });
logger.info("Build dag success, dag: {}", dag);
}
@@ -1043,11 +1073,28 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
- if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+ // todo: use TaskInstanceDTO
+ TaskRuntimeContext taskRuntimeContext = new TaskRuntimeContext();
+ if (isolatedTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+ taskRuntimeContext.setIsolationTask(
+ isolationMetadataManager.isIsolatedTask(processInstance.getId(), taskInstance.getTaskCode()));
+ taskRuntimeContext.setHasBeenIsolated(true);
taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION);
logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}",
taskInstance.getTaskCode());
}
+ if (coronationMetadataManager.isInCoronationMode()) {
+ if (!coronattedTaskCodeToTimesMap.containsKey(taskInstance.getTaskCode())) {
+ taskInstance.setState(ExecutionStatus.PAUSE_BY_CORONATION);
+ logger.info(
+ "The current server mode is in coronation, and the task: {} is not in coronation list, will pause it",
+ taskInstance.getTaskCode());
+ }
+ taskRuntimeContext.setCoronationTask(coronationMetadataManager.isCoronationTask(processInstance.getId(),
+ taskInstance.getTaskCode()));
+ taskRuntimeContext.setHasBeenCoronatted(true);
+ }
+ taskInstance.setRuntimeContext(JSONUtils.toJsonString(taskRuntimeContext));
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@@ -1784,6 +1831,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (task == null) {
continue;
}
+ if (coronationMetadataManager.isInCoronationMode()
+ && skipTaskNodeMap.containsKey(Long.toString(task.getTaskCode()))) {
+ logger.info(
+ "The current server mode is in coronation, and the task: {} is in skip list, will ignore this task",
+ task.getTaskCode());
+ continue;
+ }
+
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
@@ -1921,6 +1976,65 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
+ /**
+ * Add the task in coronation list, if the task is pause by coronation, will recovery it.
+ * if the task is in skip list, will ignore the coronation.
+ */
+ public void onlineTaskCoronation(CoronationTaskDTO coronationTask) {
+ // set the forbidden task nodes
+ long taskCode = coronationTask.getTaskCode();
+ coronationTask.getForbiddenUpstreamTasks().forEach(forbiddenTaskNode -> {
+ String forbiddenTaskNodeStr = Long.toString(forbiddenTaskNode.getTaskCode());
+ skipTaskNodeMap.put(forbiddenTaskNodeStr, dag.getNode(forbiddenTaskNodeStr));
+ });
+ // find the upstream task
+ Set<String> parentNodes = DagHelper.getAllPreNodes(Long.toString(taskCode), dag);
+ for (String parentNodeCodeStr : parentNodes) {
+ long parentNodeCode = Long.parseLong(parentNodeCodeStr);
+ if (skipTaskNodeMap.containsKey(parentNodeCodeStr)) {
+ logger.info("The parent node: {} is in skip list, no need to coronation", parentNodeCodeStr);
+ continue;
+ }
+ coronattedTaskCodeToTimesMap.put(parentNodeCode,
+ coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1);
+ Integer parentNodeInstanceId = validTaskMap.get(parentNodeCode);
+ if (parentNodeInstanceId != null) {
+ TaskInstance taskInstance = activeTaskProcessorMaps.get(parentNodeInstanceId).taskInstance();
+ if (taskInstance.getState().typeIsPauseByCoronation()) {
+ // resubmit the task to standbylist, this task will be resubmit again.
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+ logger.info(
+ "The current task: {} has been added to coronation, will recovery from pause_by_coronation, now set the status to submitted_success, it will submit again",
+ parentNodeCode);
+ addTaskToStandByList(taskInstance);
+ continue;
+ }
+ } else {
+ logger.info("The current task: {} has been added to coronation", parentNodeCode);
+ }
+ }
+ }
+
+ /**
+ * Cancel the coronation task, will only remove the task in coronation task.
+ */
+ public void cancelTaskCoronation(CoronationTaskDTO coronationTask) {
+ Set<String> parentNodes = DagHelper.getAllPreNodes(Long.toString(coronationTask.getTaskCode()), dag);
+ for (String parentNodeStr : parentNodes) {
+ long parentNode = Long.parseLong(parentNodeStr);
+ Integer coronationTimes = coronattedTaskCodeToTimesMap.get(parentNode);
+ if (coronationTimes == null) {
+ continue;
+ }
+ if (coronationTimes == 1) {
+ logger.info("The current task: {} has been removed from coronation list", parentNode);
+ coronattedTaskCodeToTimesMap.remove(parentNode);
+ continue;
+ }
+ coronattedTaskCodeToTimesMap.put(parentNode, coronationTimes - 1);
+ }
+ }
+
private enum WorkflowRunnableStatus {
CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
new file mode 100644
index 0000000000..68aba94bda
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java
@@ -0,0 +1,273 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRPCClient;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.transformer.MasterCommandTransformer;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class CoronationMetadataManager {
+
+ @Autowired
+ private CoronationTaskDao coronationTaskDao;
+
+ @Autowired
+ private RegistryClient registryClient;
+
+ @Autowired
+ private ProcessInstanceDao processInstanceDao;
+
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
+ @Autowired
+ private MasterCommandTransformer commandTransformer;
+
+ @Autowired
+ private CommandDao commandDao;
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Autowired
+ private MasterRPCClient masterRPCClient;
+
+ private volatile CoronationMode coronationMode = CoronationMode.NOT_IN_CORONATION;
+
+ // coronationId -> coronationTask
+ private volatile Map<Long, CoronationTaskDTO> coronationTaskInMemory = new HashMap<>();
+ private final Map<Integer, Set<CoronationTaskDTO>> workflowCoronationTaskMap = new HashMap<>();
+ // workflowInstanceId -> coronationTaskCodes
+ private final Map<Integer, Set<Long>> workflowCoronationTaskCodeMap = new HashMap<>();
+
+ @PostConstruct
+ public void init() {
+ refreshCoronationTaskMetadata();
+ }
+
+ public synchronized void refreshCoronationTaskMetadata() {
+ StopWatch stopWatch = StopWatch.createStarted();
+ Map<Long, CoronationTaskDTO> coronationTaskInDB = coronationTaskDao.queryAllCoronationTasks()
+ .stream()
+ .collect(Collectors.toMap(CoronationTaskDTO::getId, Function.identity()));
+
+ List<CoronationTaskDTO> addCoronationTasks = coronationTaskInDB.values()
+ .stream()
+ .filter(coronationTask -> !coronationTaskInMemory.containsKey(coronationTask.getId()))
+ .collect(Collectors.toList());
+ List<CoronationTaskDTO> deleteCoronationTasks = coronationTaskInMemory.values()
+ .stream()
+ .filter(coronationTask -> !coronationTaskInDB.containsKey(coronationTask.getId()))
+ .collect(Collectors.toList());
+ coronationTaskInMemory = coronationTaskInDB;
+ if (coronationTaskInMemory.isEmpty()) {
+ if (coronationMode == CoronationMode.IN_CORONATION) {
+ log.info("There is not coronation tasks, will begin to close coronation mode...");
+ closeCoronation();
+ coronationMode = CoronationMode.NOT_IN_CORONATION;
+ log.info("Close coronation mode success...");
+ }
+ } else {
+ addCoronationTasks(addCoronationTasks);
+ cancelCoronationTasks(deleteCoronationTasks);
+ if (coronationMode == CoronationMode.NOT_IN_CORONATION) {
+ coronationMode = CoronationMode.IN_CORONATION;
+ log.info("Open the coronation mode...");
+ }
+ }
+ stopWatch.stop();
+ log.info("Refresh coronation task from DB finished, cost: {}", stopWatch.getTime());
+ }
+
+ public boolean isCoronationTask(int workflowInstanceId, long taskCode) {
+ Set<Long> taskCodes = workflowCoronationTaskCodeMap.get(workflowInstanceId);
+ if (taskCodes == null) {
+ return false;
+ }
+ return taskCodes.contains(taskCode);
+ }
+
+ public void deleteCoronationMetadataInDB(long taskCode) {
+ Optional<CoronationTaskDTO> successCoronationTask = coronationTaskInMemory.values()
+ .stream()
+ .filter(coronationTask -> coronationTask.getTaskCode() == taskCode)
+ .findAny();
+ if (!successCoronationTask.isPresent()) {
+ return;
+ }
+ coronationTaskDao.deleteById(successCoronationTask.get().getId());
+ if (coronationTaskDao.queryAllCoronationTaskNumber() == 0) {
+ // sendCoronationRefreshRequestTo All master
+ try {
+ log.info("There is no coronation tasks, will send refresh coronation meta data to all master");
+ List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+ RefreshCoronationMetadataRequest request = new RefreshCoronationMetadataRequest();
+ for (Server master : masters) {
+ try {
+ masterRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+ request.convert2Command());
+ } catch (Exception e) {
+ log.error(
+ "Close coronation failed, send refresh coronation metadata request to master: {} error",
+ master, e);
+ }
+ }
+ } catch (Exception ex) {
+ log.error(
+ "Close coronation failed, meet an unknown exception, will wait the master to auto refresh the coronation metadata",
+ ex);
+ }
+ }
+ }
+
+ private void closeCoronation() {
+ // The current server is in coronation mode, need to close coronation.
+ // Need to acquire a lock to guarantee there is only one master recovery the pause_by_coronation workflow
+ try {
+ // block to acquire the master lock
+ if (!registryClient.getLock(NodeType.MASTER.getRegistryPath())) {
+ log.error("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath());
+ return;
+ }
+ // find the all instance that need to be recovery
+ // create recovery command
+ List<Command> needToInsertCommand =
+ processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_ISOLATION)
+ .stream()
+ .filter(processInstance -> {
+ List<Command> commands = commandDao
+ .queryRecoveryCoronationCommandByWorkflowInstanceId(processInstance.getId());
+ // this workflow instance has not been recovery.
+ return CollectionUtils.isEmpty(commands);
+ })
+ .map(processInstance -> {
+ List<Integer> needToRecoveryTaskInstanceIds = taskInstanceDao
+ .queryValidatedTaskInstanceByWorkflowInstanceIdAndStatus(
+ processInstance.getId(), ExecutionStatus.PAUSE_BY_CORONATION)
+ .stream()
+ .map(TaskInstance::getId)
+ .collect(Collectors.toList());
+ return commandTransformer.transformToRecoveryFromCoronationPauseCommand(processInstance,
+ needToRecoveryTaskInstanceIds);
+ })
+ .collect(Collectors.toList());
+ commandDao.batchInsertCommand(needToInsertCommand);
+ } finally {
+ registryClient.releaseLock(NodeType.MASTER.getRegistryPath());
+ }
+ }
+
+ private void addCoronationTasks(List<CoronationTaskDTO> coronationTasksNeedToOnline) {
+ if (CollectionUtils.isEmpty(coronationTasksNeedToOnline)) {
+ return;
+ }
+ for (CoronationTaskDTO coronationTask : coronationTasksNeedToOnline) {
+ workflowCoronationTaskCodeMap.computeIfAbsent(coronationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+ .add(coronationTask.getTaskCode());
+ workflowCoronationTaskMap.computeIfAbsent(coronationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+ .add(coronationTask);
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+ processInstanceExecCacheManager.getByProcessInstanceId(coronationTask.getWorkflowInstanceId());
+ if (workflowExecuteRunnable == null) {
+ continue;
+ }
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(coronationTask.getWorkflowInstanceId());
+ log.info("Begin to add new coronation task: {}", coronationTask);
+ workflowExecuteRunnable.onlineTaskCoronation(coronationTask);
+ } catch (Exception ex) {
+ log.error("Add new coronation task: {} failed, meet an unknown exception", coronationTask, ex);
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+ }
+
+ private void cancelCoronationTasks(List<CoronationTaskDTO> coronationTasksNeedToOffline) {
+ if (CollectionUtils.isEmpty(coronationTasksNeedToOffline)) {
+ return;
+ }
+ for (CoronationTaskDTO coronationTask : coronationTasksNeedToOffline) {
+ Set<Long> taskCodes = workflowCoronationTaskCodeMap.get(coronationTask.getWorkflowInstanceId());
+ Set<CoronationTaskDTO> coronationTasks =
+ workflowCoronationTaskMap.get(coronationTask.getWorkflowInstanceId());
+ if (taskCodes != null) {
+ taskCodes.remove(coronationTask.getTaskCode());
+ coronationTasks.remove(coronationTask);
+ if (taskCodes.isEmpty()) {
+ workflowCoronationTaskCodeMap.remove(coronationTask.getWorkflowInstanceId());
+ workflowCoronationTaskMap.remove(coronationTask.getWorkflowInstanceId());
+ }
+ }
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+ processInstanceExecCacheManager.getByProcessInstanceId(coronationTask.getWorkflowInstanceId());
+ if (workflowExecuteRunnable == null) {
+ continue;
+ }
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(coronationTask.getWorkflowInstanceId());
+ log.info("Begin to cancel coronation task: {}", coronationTask);
+ workflowExecuteRunnable.cancelTaskCoronation(coronationTask);
+ } catch (Exception ex) {
+ log.error("Cancel coronation task: {} failed, meed an unknown exception", coronationTask, ex);
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+ }
+
+ public boolean isInCoronationMode() {
+ return coronationMode == CoronationMode.IN_CORONATION;
+ }
+
+ public Set<CoronationTaskDTO> getCoronationTasksByWorkflowInstanceId(int workflowInstanceId) {
+ return workflowCoronationTaskMap.getOrDefault(workflowInstanceId, Collections.emptySet());
+ }
+
+ public enum CoronationMode {
+
+ IN_CORONATION(0, "The current server is already in coronation mode"),
+ NOT_IN_CORONATION(1, "The current server is not in coronation mode"),
+ ;
+
+ private final int code;
+ private final String desc;
+
+ CoronationMode(int code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
new file mode 100644
index 0000000000..d90a95a42d
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationMetadataManager.java
@@ -0,0 +1,136 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class IsolationMetadataManager {
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ private volatile Map<Long, IsolationTask> isolationTasksInMemory = new HashMap<>();
+ private final Map<Integer, Set<Long>> workflowIsolatedTaskCodeMap = new HashMap<>();
+ private final Map<Integer, Set<IsolationTask>> workflowIsolatedTaskMap = new HashMap<>();
+ @Autowired
+ private IsolationTaskDao isolationTaskDao;
+
+ @PostConstruct
+ public void init() {
+ refreshIsolationTaskMetadata();
+ }
+
+ public synchronized void refreshIsolationTaskMetadata() {
+ StopWatch stopWatch = StopWatch.createStarted();
+
+ Map<Long, IsolationTask> isolationTasksInDB =
+ isolationTaskDao.queryAllIsolationTask()
+ .stream()
+ .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
+
+ List<IsolationTask> needToAddIsolationTasks = isolationTasksInMemory.values()
+ .stream()
+ .filter(isolationTask -> !isolationTasksInDB.containsKey(isolationTask.getId()))
+ .collect(Collectors.toList());
+
+ List<IsolationTask> needToCancelIsolationTasks = isolationTasksInDB.values()
+ .stream()
+ .filter(isolationTask -> !isolationTasksInMemory.containsKey(isolationTask.getId()))
+ .collect(Collectors.toList());
+
+ isolationTasksInMemory = isolationTasksInDB;
+
+ cancelIsolationTask(needToAddIsolationTasks);
+ addIsolationTask(needToCancelIsolationTasks);
+
+ stopWatch.stop();
+ log.info("Refresh isolation task from db finished, cost: {} ms", stopWatch.getTime());
+ }
+
+ public boolean isIsolatedTask(int workflowInstanceId, long taskCode) {
+ Set<Long> taskCodes = workflowIsolatedTaskCodeMap.get(workflowInstanceId);
+ if (taskCodes == null) {
+ return false;
+ }
+ return taskCodes.contains(taskCode);
+ }
+
+ private void cancelIsolationTask(List<IsolationTask> needCancelIsolationTasks) {
+ if (CollectionUtils.isEmpty(needCancelIsolationTasks)) {
+ return;
+ }
+ for (IsolationTask needCancelIsolation : needCancelIsolationTasks) {
+ Set<Long> taskCodes = workflowIsolatedTaskCodeMap.get(needCancelIsolation.getWorkflowInstanceId());
+ Set<IsolationTask> isolationTasks =
+ workflowIsolatedTaskMap.get(needCancelIsolation.getWorkflowInstanceId());
+ if (taskCodes != null) {
+ taskCodes.remove(needCancelIsolation.getTaskCode());
+ isolationTasks.remove(needCancelIsolation);
+ if (taskCodes.isEmpty()) {
+ workflowIsolatedTaskCodeMap.remove(needCancelIsolation.getWorkflowInstanceId());
+ workflowIsolatedTaskMap.remove(needCancelIsolation.getWorkflowInstanceId());
+ }
+ }
+
+ WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+ .getByProcessInstanceId(needCancelIsolation.getWorkflowInstanceId());
+ if (workflowExecuteRunnable == null) {
+ continue;
+ }
+ workflowExecuteRunnable.cancelTaskIsolation(needCancelIsolation.getTaskCode());
+ log.info("Backend offline isolation task, isolationTaskId: {}", needCancelIsolation.getId());
+ }
+ }
+
+ private void addIsolationTask(List<IsolationTask> needAddIsolationTasks) {
+ if (CollectionUtils.isEmpty(needAddIsolationTasks)) {
+ return;
+ }
+ for (IsolationTask needAddIsolationTask : needAddIsolationTasks) {
+ workflowIsolatedTaskCodeMap
+ .computeIfAbsent(needAddIsolationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+ .add(needAddIsolationTask.getTaskCode());
+ workflowIsolatedTaskMap
+ .computeIfAbsent(needAddIsolationTask.getWorkflowInstanceId(), k -> new HashSet<>())
+ .add(needAddIsolationTask);
+ WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+ .getByProcessInstanceId(needAddIsolationTask.getWorkflowInstanceId());
+ if (workflowExecuteRunnable == null) {
+ continue;
+ }
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(needAddIsolationTask.getWorkflowInstanceId());
+ log.info("Begin to add new isolation task, taskCode: {}", needAddIsolationTask.getTaskCode());
+ workflowExecuteRunnable.addTaskIsolation(needAddIsolationTask.getTaskCode());
+ } catch (Exception ex) {
+ log.error("Add new isolation task: {} failed, meet an known exception",
+ needAddIsolationTask.getTaskCode());
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+ }
+
+ public Set<IsolationTask> queryIsolationTasksByWorkflowInstanceId(int workflowInstanceId) {
+ return workflowIsolatedTaskMap.getOrDefault(workflowInstanceId, Collections.emptySet());
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
deleted file mode 100644
index 2621766d84..0000000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.dolphinscheduler.server.master.service;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
-import org.apache.dolphinscheduler.dao.entity.IsolationTask;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Service
-public class IsolationTaskManager {
-
- @Autowired
- private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
- private volatile Map<Long, IsolationTask> onlineIsolationTasksInMemory = new HashMap<>();
-
- private volatile Set<Long> currentIsolationTaskIdsInMemory = new HashSet<>();
-
- @Autowired
- private IsolationTaskDao isolationTaskDao;
-
- @PostConstruct
- public void init() {
- refreshIsolationTaskMapFromDB();
- }
-
- public void refreshIsolationTaskMapFromDB() {
- Map<Long, IsolationTask> totalOnlineIsolationTasksInDB =
- isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE)
- .stream()
- .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
- Set<Long> totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet();
-
- Collection<IsolationTask> needToOfflineIsolationTasks =
- CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB)
- .stream()
- .map(onlineIsolationTasksInMemory::get)
- .collect(Collectors.toList());
-
- Collection<IsolationTask> needToOnlineIsolationTasks =
- CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory)
- .stream()
- .map(totalOnlineIsolationTasksInDB::get)
- .collect(Collectors.toList());
-
- currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB;
- onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB;
-
- offlineIsolationTask(needToOfflineIsolationTasks);
- onlineIsolationTask(needToOnlineIsolationTasks);
- }
-
- private void offlineIsolationTask(Collection<IsolationTask> needOfflineIsolationTasks) {
- if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) {
- return;
- }
- for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) {
- WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
- .getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId());
- if (workflowExecuteRunnable == null) {
- continue;
- }
- workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode());
- log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId());
- }
- }
-
- private void onlineIsolationTask(Collection<IsolationTask> needOnlineIsolationTasks) {
- if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) {
- return;
- }
- for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) {
- WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
- .getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId());
- if (workflowExecuteRunnable == null) {
- continue;
- }
- workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode());
- log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId());
- }
- }
-
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java
new file mode 100644
index 0000000000..86f19925c8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformer.java
@@ -0,0 +1,14 @@
+package org.apache.dolphinscheduler.server.master.transformer;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+
+import java.util.List;
+
+public interface MasterCommandTransformer {
+
+ Command transformToRecoveryFromCoronationPauseCommand(@NonNull ProcessInstance processInstance,
+ @NonNull List<Integer> needToRecoveryTaskInstanceIds);
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java
new file mode 100644
index 0000000000..d8b900093b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/transformer/MasterCommandTransformerImpl.java
@@ -0,0 +1,45 @@
+package org.apache.dolphinscheduler.server.master.transformer;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class MasterCommandTransformerImpl implements MasterCommandTransformer {
+
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
+ @Override
+ public Command transformToRecoveryFromCoronationPauseCommand(@NonNull ProcessInstance processInstance,
+ @NonNull List<Integer> needToRecoveryTaskInstanceIds) {
+ if (CollectionUtils.isEmpty(needToRecoveryTaskInstanceIds)) {
+ throw new IllegalArgumentException("The recovery task instance cannot be empty");
+ }
+
+ Command command = new Command();
+ command.setProcessInstanceId(processInstance.getId());
+ command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+ command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+ command.setTaskDependType(processInstance.getTaskDependType());
+ command.setFailureStrategy(processInstance.getFailureStrategy());
+ command.setCommandType(CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS);
+
+ Map<String, String> map = new HashMap<>();
+ map.put(Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS,
+ JSONUtils.toJsonString(needToRecoveryTaskInstanceIds));
+ command.setCommandParam(JSONUtils.toJsonString(map));
+ return command;
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
index 00ae91019d..4bdacfc3a4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
@@ -38,6 +39,8 @@ public class WorkflowInstanceUtils {
return getStartTaskInstanceIdsFromStateCleanParam(processInstance);
case RECOVERY_FROM_ISOLATION_TASKS:
return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance);
+ case RECOVERY_FROM_CORONATION_PAUSE_TASKS:
+ return getStartTaskInstanceIdsFromRecoverCoronationParam(processInstance);
default:
return Collections.emptyList();
}
@@ -87,6 +90,21 @@ public class WorkflowInstanceUtils {
return result;
}
+ public static List<Integer> getStartTaskInstanceIdsFromRecoverCoronationParam(@NonNull ProcessInstance processInstance) {
+ Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+ if (MapUtils.isEmpty(commandParamMap)) {
+ return Collections.emptyList();
+ }
+ List<Integer> stateCleanTaskInstanceIds =
+ JSONUtils.parseObject(commandParamMap.get(CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS),
+ new TypeReference<ArrayList<Integer>>() {
+ });
+ if (stateCleanTaskInstanceIds == null) {
+ return Collections.emptyList();
+ }
+ return stateCleanTaskInstanceIds;
+ }
+
public static List<String> getStartNodeName(@NonNull ProcessInstance processInstance) {
List<String> startNodeNameList = new ArrayList<>();
Map<String, String> paramMap = JSONUtils.toMap(processInstance.getCommandParam());
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 856d38cb2f..4e9c9ce9e0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -17,19 +17,9 @@
package org.apache.dolphinscheduler.server.master.runner;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-
-import static org.powermock.api.mockito.PowerMockito.mock;
-
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
-import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
-import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -37,12 +27,25 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.service.CoronationMetadataManager;
+import org.apache.dolphinscheduler.server.master.service.IsolationMetadataManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.context.ApplicationContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -58,15 +61,11 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.springframework.context.ApplicationContext;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.powermock.api.mockito.PowerMockito.mock;
/**
* test for WorkflowExecuteThread
@@ -93,7 +92,9 @@ public class WorkflowExecuteRunnableTest {
private CuringParamsService curingGlobalParamsService;
- private IsolationTaskDao isolationTaskDao;
+ private IsolationMetadataManager isolationMetadataManager;
+
+ private CoronationMetadataManager coronationMetadataManager;
@Before
public void init() throws Exception {
@@ -127,11 +128,12 @@ public class WorkflowExecuteRunnableTest {
curingGlobalParamsService = mock(CuringParamsService.class);
NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
- isolationTaskDao = mock(IsolationTaskDao.class);
+ isolationMetadataManager = mock(IsolationMetadataManager.class);
+ coronationMetadataManager = mock(CoronationMetadataManager.class);
workflowExecuteThread =
PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao,
nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread,
- curingGlobalParamsService, isolationTaskDao));
+ curingGlobalParamsService, isolationMetadataManager, coronationMetadataManager));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index daf92071b5..a69da08951 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -121,7 +121,9 @@ public enum CommandType {
*/
WORKFLOW_EXECUTING_DATA_RESPONSE,
- REFRESH_ISOLATION_REQUEST,
+ REFRESH_ISOLATION_METADATA_REQUEST,
+
+ REFRESH_CORONATION_METADATA_REQUEST,
;
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
similarity index 66%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
index 9b097843c4..4f6d94f2a9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/coronation/RefreshCoronationMetadataRequest.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.remote.command.isolation;
+package org.apache.dolphinscheduler.remote.command.coronation;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -6,13 +6,14 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import java.io.Serializable;
-public class RefreshIsolationTaskRequest implements Serializable {
+public class RefreshCoronationMetadataRequest implements Serializable {
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+ command.setType(CommandType.REFRESH_CORONATION_METADATA_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
+
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
similarity index 68%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
index 9b097843c4..0241823a08 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationMetadataRequest.java
@@ -1,16 +1,20 @@
package org.apache.dolphinscheduler.remote.command.isolation;
+import lombok.Data;
+import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.io.Serializable;
-public class RefreshIsolationTaskRequest implements Serializable {
+@Data
+@NoArgsConstructor
+public class RefreshIsolationMetadataRequest implements Serializable {
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+ command.setType(CommandType.REFRESH_ISOLATION_METADATA_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c1fe78e78a..43000df020 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1048,10 +1048,9 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
break;
case RECOVERY_FROM_ISOLATION_TASKS:
- Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
List<Integer> recoveryPausedIsolationIds =
JSONUtils.parseObject(
- commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+ cmdParam.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
new TypeReference<ArrayList<Integer>>() {
});
if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) {
@@ -1059,7 +1058,7 @@ public class ProcessServiceImpl implements ProcessService {
}
List<Integer> recoveryKilledIsolationIds =
JSONUtils.parseObject(
- commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+ cmdParam.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
new TypeReference<ArrayList<Integer>>() {
});
if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) {
@@ -1068,6 +1067,18 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setRunTimes(runTime + 1);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
break;
+ case RECOVERY_FROM_CORONATION_PAUSE_TASKS:
+ List<Integer> pauseByCoronationIds =
+ JSONUtils.parseObject(cmdParam.get(Constants.CMD_PARAM_RECOVERY_PAUSED_BY_CORONATION_TASK_IDS),
+ new TypeReference<ArrayList<Integer>>() {
+ });
+ for (Integer taskId : pauseByCoronationIds) {
+ // initialize the pause state
+ initTaskInstance(this.findTaskInstanceById(taskId));
+ }
+ processInstance.setRunTimes(runTime + 1);
+ processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ break;
default:
break;
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index a83184e16b..6f78e37629 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -67,6 +67,8 @@ public enum ExecutionStatus {
DISPATCH(17, "dispatch"),
PAUSE_BY_ISOLATION(18, "paused by isolation"),
KILL_BY_ISOLATION(19, "killed by isolation"),
+
+ PAUSE_BY_CORONATION(20, "paused by coronation"),
;
ExecutionStatus(int code, String descp) {
@@ -120,7 +122,7 @@ public enum ExecutionStatus {
*/
public boolean typeIsFinished() {
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation()
- || typeIsStop() || typeIsBlock();
+ || typeIsStop() || typeIsBlock() || typeIsPauseByCoronation();
}
public boolean typeIsReady() {
@@ -149,6 +151,10 @@ public enum ExecutionStatus {
return this == PAUSE_BY_ISOLATION;
}
+ public boolean typeIsPauseByCoronation() {
+ return this == PAUSE_BY_CORONATION;
+ }
+
public boolean typeIsKilledByIsolation() {
return this == KILL_BY_ISOLATION;
}