You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/10/28 06:50:16 UTC
[dolphinscheduler] 01/01: Implement isolate task api interface
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev_wenjun_isolateTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 8268714c9ac26cb2602ebf762ed8284d2a830123
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Oct 12 22:14:37 2022 +0800
Implement isolate task api interface
---
.../api/checker/IsolationTaskChecker.java | 34 +++
.../api/checker/IsolationTaskCheckerImpl.java | 206 +++++++++++++++++
.../api/checker/WorkflowInstanceCheckerImpl.java | 24 +-
.../constants/ApiFuncIdentificationConstant.java | 5 +
.../api/controller/IsolationTaskController.java | 96 ++++++++
.../dto/request/CleanTaskInstanceStateRequest.java | 2 +
...equest.java => IsolationTaskCancelRequest.java} | 8 +-
...equest.java => IsolationTaskDeleteRequest.java} | 6 +-
.../dto/request/IsolationTaskListingRequest.java | 24 ++
.../dto/request/IsolationTaskSubmitRequest.java | 18 ++
.../apache/dolphinscheduler/api/enums/Status.java | 39 +++-
.../api/remote/ApiServerRPCClient.java | 35 +++
.../api/service/IsolationTaskService.java | 32 +++
.../api/service/ProcessInstanceService.java | 8 +-
.../api/service/ProjectService.java | 2 +
.../api/service/WorkflowDAGService.java | 15 ++
.../api/service/impl/IsolationTaskServiceImpl.java | 246 +++++++++++++++++++++
.../api/service/impl/ProjectServiceImpl.java | 7 +-
.../api/service/impl/TaskInstanceServiceImpl.java | 25 ++-
.../api/service/impl/WorkflowDagServiceImpl.java | 64 ++++++
.../api/transformer/CommandTransformer.java | 13 +-
.../api/transformer/CommandTransformerImpl.java | 70 +++---
.../api/vo/IsolationTaskExcelParseVO.java | 25 +++
.../src/main/resources/static/excel-en.xlsx | Bin 0 -> 5718 bytes
.../apache/dolphinscheduler/common/Constants.java | 5 +-
.../dolphinscheduler/common/enums/CommandType.java | 2 +
.../dao/dto/IsolationTaskStatus.java | 27 +++
.../dolphinscheduler/dao/entity/IsolationTask.java | 37 ++++
.../dao/mapper/IsolationTaskMapper.java | 32 +++
.../dao/repository/IsolationTaskDao.java | 39 ++++
.../dao/repository/ProcessDefinitionLogDao.java | 12 +
.../dao/repository/ProcessInstanceDao.java | 1 +
.../dao/repository/ProcessTaskRelationLogDao.java | 11 +
.../dao/repository/TaskDefinitionLogDao.java | 12 +
.../dao/repository/TaskInstanceDao.java | 2 +
.../dao/repository/impl/IsolationTaskDaoImpl.java | 86 +++++++
.../impl/ProcessDefinitionLogDaoImpl.java | 25 +++
.../impl/ProcessTaskRelationLogDaoImpl.java | 24 ++
.../repository/impl/TaskDefinitionLogDaoImpl.java | 44 ++++
.../dao/repository/impl/TaskInstanceDaoImpl.java | 6 +
.../dolphinscheduler/dao/utils/DagHelper.java | 99 ++++++++-
.../dao/mapper/IsolationTaskMapper.xml | 97 ++++++++
.../src/main/resources/sql/dolphinscheduler_h2.sql | 17 ++
.../main/resources/sql/dolphinscheduler_mysql.sql | 73 +++---
.../resources/sql/dolphinscheduler_postgresql.sql | 18 ++
.../dolphinscheduler/dao/utils/DagHelperTest.java | 33 ++-
.../isolation/RefreshIsolationTaskProcessor.java | 26 +++
.../server/master/rpc/MasterRPCServer.java | 8 +
.../master/runner/FailoverExecuteThread.java | 5 +
.../master/runner/MasterSchedulerBootstrap.java | 7 +-
.../master/runner/WorkflowExecuteRunnable.java | 130 ++++++++++-
.../master/runner/task/BaseTaskProcessor.java | 16 ++
.../master/runner/task/BlockingTaskProcessor.java | 11 +-
.../master/runner/task/CommonTaskProcessor.java | 24 +-
.../master/runner/task/ConditionTaskProcessor.java | 11 +-
.../master/runner/task/DependentTaskProcessor.java | 8 +
.../master/runner/task/SubTaskProcessor.java | 11 +
.../master/runner/task/SwitchTaskProcessor.java | 8 +
.../server/master/runner/task/TaskAction.java | 1 +
.../master/service/IsolationTaskManager.java | 97 ++++++++
.../server/master/utils/TaskInstanceUtils.java | 1 -
.../server/master/utils/WorkflowInstanceUtils.java | 18 ++
.../master/runner/WorkflowExecuteRunnableTest.java | 6 +-
.../remote/command/CommandType.java | 6 +-
.../isolation/RefreshIsolationTaskRequest.java | 18 ++
.../service/process/ProcessServiceImpl.java | 27 ++-
.../plugin/task/api/enums/ExecutionStatus.java | 26 ++-
67 files changed, 2037 insertions(+), 134 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
new file mode 100644
index 0000000000..9ee19f10be
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java
@@ -0,0 +1,34 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.List;
+
+public interface IsolationTaskChecker {
+
+ void checkCanSubmitTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ ProcessInstance processInstance,
+ @NonNull List<IsolationTaskExcelParseVO> voList);
+
+ void checkCanOnlineTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ @NonNull ProcessInstance processInstance,
+ @NonNull IsolationTask isolationTask);
+
+ void checkCanListingTaskIsolation(@NonNull User loginUser,
+ long projectCode);
+
+ void checkCanDeleteTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ long isolationId);
+
+ void checkCanCancelTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ ProcessInstance processInstance,
+ @NonNull IsolationTask isolationTasks);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
new file mode 100644
index 0000000000..99402f7fb6
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java
@@ -0,0 +1,206 @@
+package org.apache.dolphinscheduler.api.checker;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
+
+@Component
+public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
+
+ @Autowired
+ private ProjectService projectService;
+
+ @Autowired
+ private ProcessDefinitionLogDao processDefinitionLogDao;
+
+ @Autowired
+ private WorkflowDAGService workflowDAGService;
+
+ @Autowired
+ private IsolationTaskDao isolationTaskDao;
+
+ @Override
+ public void checkCanSubmitTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ ProcessInstance processInstance,
+ @NonNull List<IsolationTaskExcelParseVO> voList) {
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ if (processInstance == null) {
+ throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+ }
+
+ ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+ .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion())
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+ checkSubmitIsolateTaskAuth(loginUser, project, processDefinitionLog);
+
+ DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(processDefinitionLog);
+ checkWorkflowInstanceCanSubmitIsolateTask(processInstance.getName(), processInstance);
+
+ checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG);
+ }
+
+ @Override
+ public void checkCanOnlineTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ ProcessInstance processInstance,
+ @NonNull IsolationTask isolationTask) {
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ checkOnlineIsolationTaskAuth(loginUser, project, processInstance);
+
+ if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) {
+ throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE);
+ }
+
+ String workflowInstanceName = isolationTask.getWorkflowInstanceName();
+ checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance);
+ }
+
+ @Override
+ public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) {
+ Project project = projectService.queryByCode(loginUser, projectCode);
+
+ projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST);
+ }
+
+ @Override
+ public void checkCanDeleteTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ long isolationId) {
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE);
+
+ IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
+ .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+ if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) {
+ throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE);
+ }
+ }
+
+ @Override
+ public void checkCanCancelTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ ProcessInstance processInstance,
+ @NonNull IsolationTask isolationTask) {
+ if (processInstance == null) {
+ throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+ }
+ Project project = projectService.queryByCode(loginUser, projectCode);
+ checkCancelIsolationTaskAuth(loginUser, project, processInstance);
+
+ if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) {
+ throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL);
+ }
+ }
+
+ private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName,
+ ProcessInstance processInstance) {
+ // check if the given workflow instance can do isolate operation
+ // If the workflow instance is at ready_xx status, it cannot do isolate operation
+ if (processInstance == null) {
+ throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceName);
+ }
+ if (processInstance.getState().typeIsReady()) {
+ throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT, workflowInstanceName);
+ }
+ }
+
+ private void checkTaskNodeCanSubmitIsolateTask(@NonNull List<IsolationTaskExcelParseVO> voList,
+ @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+ for (IsolationTaskExcelParseVO vo : voList) {
+ String taskCodeStr = Long.toString(vo.getTaskCode());
+ // check if the taskNode exist in DAG
+ if (!workflowDAG.containsNode(taskCodeStr)) {
+ throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST, vo.getTaskName());
+ }
+ // check if the pre task exist an online isolation task
+ // if existed, we cannot create a new isolation task
+ Set<String> allPreNodes = DagHelper.getAllPreNodes(taskCodeStr, workflowDAG);
+ List<IsolationTask> isolationTasks = isolationTaskDao.queryByTaskCodes(vo.getWorkflowInstanceId(),
+ allPreNodes.stream().map(Long::parseLong).collect(Collectors.toList()));
+ // todo: Do we need to support if the sub isolation task is offline?
+ if (CollectionUtils.isNotEmpty(isolationTasks)) {
+ throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK);
+ }
+ }
+ }
+
+ private void checkSubmitIsolateTaskAuth(@NonNull User loginUser,
+ @NonNull Project project,
+ @NonNull ProcessDefinitionLog processDefinitionLog) {
+ if (processDefinitionLog.getProjectCode() != project.getCode()) {
+ throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
+ }
+ projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT);
+ }
+
+ private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
+ if (processInstance == null) {
+ throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+ }
+ ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+ .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion())
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+ if (processDefinitionLog.getProjectCode() != project.getCode()) {
+ throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
+ }
+
+ projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE);
+ }
+
+ private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
+ if (processInstance == null) {
+ throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
+ }
+ ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
+ .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion())
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
+ if (processDefinitionLog.getProjectCode() != project.getCode()) {
+ throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
+ }
+ projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_CANCEL);
+
+ }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
index 1805934628..05e708d3b0 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java
@@ -2,11 +2,11 @@ package org.apache.dolphinscheduler.api.checker;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -14,13 +14,12 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.enums.Status.CLEAN_TASK_INSTANCE_ERROR_WORKFLOW_INSTANCE_IS_RUNNING;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
-import static org.apache.dolphinscheduler.api.enums.Status.PROJECT_NOT_EXIST;
@Component
public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
@Autowired
- private ProcessDefinitionDao processDefinitionDao;
+ private ProcessDefinitionLogDao processDefinitionLogDao;
@Autowired
private ProjectService projectService;
@@ -30,16 +29,13 @@ public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
}
- // todo: check permission
- ProcessDefinition processDefinition =
- processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode());
- if (processDefinition == null) {
- throw new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getProcessDefinitionCode());
- }
- Project project = projectService.queryByCode(loginUser, processDefinition.getProjectCode());
- if (project == null) {
- throw new ServiceException(PROJECT_NOT_EXIST);
- }
+ ProcessDefinitionLog processDefinitionLog =
+ processDefinitionLogDao
+ .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion())
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
+ processInstance.getProcessDefinitionCode()));
+ Project project = projectService.queryByCode(loginUser, processDefinitionLog.getProjectCode());
projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CLEAN_TASK_INSTANCE_STATE);
// check state
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
index fba7677ab7..c8cdd2cb58 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java
@@ -96,6 +96,11 @@ public class ApiFuncIdentificationConstant {
public static final String INSTANCE_BATCH_DELETE = "project:process-instance:batch-delete";
public static final String FORCED_SUCCESS = "project:task-instance:force-success";
public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state";
+ public static final String ISOLATION_TASK_LIST = "project:isolation-task:list";
+ public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online";
+ public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit";
+ public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel";
+ public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete";
public static final String VIEW_LOG = "project:log:detail";
public static final String DOWNLOAD_LOG = "project:log:download-log";
public static final String PROJECT_OVERVIEW = "project:overview:view";
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
new file mode 100644
index 0000000000..f4b445456d
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java
@@ -0,0 +1,96 @@
+package org.apache.dolphinscheduler.api.controller;
+
+import io.swagger.annotations.Api;
+import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
+import org.apache.dolphinscheduler.api.service.IsolationTaskService;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
+
+import javax.validation.Valid;
+
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR;
+
+@Api(tags = "ISOLATION_TASK_TAG")
+@RestController
+@RequestMapping("/projects/{projectCode}/isolation-task")
+public class IsolationTaskController {
+
+ @Autowired
+ private IsolationTaskService isolationTaskService;
+
+ @PostMapping(value = "/submit")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Void> submitIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable long projectCode,
+ @RequestBody IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
+ isolationTaskService.submitTaskIsolations(loginUser, projectCode, isolationTaskSubmitRequest);
+ return Result.success(null);
+ }
+
+ @PutMapping(value = "/online/{id}")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Void> onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable long projectCode,
+ @PathVariable(name = "id") long isolationId) {
+ isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId);
+ return Result.success(null);
+ }
+
+ @PutMapping(value = "/cancel/{id}")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(ISOLATION_TASK_SUBMIT_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Void> cancelIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable long projectCode,
+ @PathVariable(name = "id") long isolationId) {
+ isolationTaskService.cancelTaskIsolation(loginUser, projectCode, isolationId);
+ return Result.success(null);
+ }
+
+ @GetMapping("")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(ISOLATION_TASK_LISTING_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<PageInfo<IsolationTask>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable long projectCode,
+ @RequestBody @Valid IsolationTaskListingRequest request) {
+ return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request));
+ }
+
+ @DeleteMapping("/{id}")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(ISOLATION_TASK_DELETE_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Void> deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @PathVariable long projectCode,
+ @PathVariable long id) {
+ isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id);
+ return Result.success(null);
+ }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
index c9afc8fabd..3fe81f7a71 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
@@ -11,5 +12,6 @@ import java.util.List;
@AllArgsConstructor
public class CleanTaskInstanceStateRequest {
+ @NotEmpty
private List<Integer> taskInstanceIds;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
similarity index 60%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
index c9afc8fabd..a7e00677b5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java
@@ -4,12 +4,14 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
-@NoArgsConstructor
@AllArgsConstructor
-public class CleanTaskInstanceStateRequest {
+@NoArgsConstructor
+public class IsolationTaskCancelRequest {
- private List<Integer> taskInstanceIds;
+ @NotEmpty
+ private List<Long> isolationIds;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
similarity index 60%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
index c9afc8fabd..82104c3c9d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java
@@ -4,12 +4,14 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
-public class CleanTaskInstanceStateRequest {
+public class IsolationTaskDeleteRequest {
- private List<Integer> taskInstanceIds;
+ @NotEmpty
+ private List<Long> isolationIds;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java
new file mode 100644
index 0000000000..fecd65e9c6
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskListingRequest {
+
+ private String workflowInstanceName;
+
+ private String taskName;
+
+ @NotNull(message = "page number cannot be null")
+ private Integer pageNo;
+
+ @NotNull(message = "page size cannot be null")
+ private Integer pageSize;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java
new file mode 100644
index 0000000000..d89b681eaf
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.api.dto.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+
+import javax.validation.constraints.NotEmpty;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskSubmitRequest {
+
+ @NotEmpty
+ private List<IsolationTaskExcelParseVO> isolationTaskExcelParseVOList;
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 37f89b406f..602cb477dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -513,7 +513,44 @@ public enum Status {
CONFIG_FILTER_EMPTY(1400007, "complement time is empty after filtering according to the configuration",
"当前补数时间根据配置过滤后为空"),
PROJECT_NAME_TOO_LONG_ERROR(1400008, "project name is too long error", "项目名称过长"),
- ;
+
+ ISOLATION_TASK_NOT_EXIST(1500000, "Isolation task not exist", "隔离任务不存在"),
+ ISOLATION_TASK_SUBMIT_ERROR(1500100, "Submit isolation task error", "提交隔离任务异常"),
+ ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT(1500101,
+ "Submit isolation task error, relate workflow instance [{0}] is not support",
+ "提交隔离任务异常, 关联的工作流实例:[{0}]暂不支持该操作"),
+ ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1500102,
+ "Submit isolation task error, relate workflow instance [{0}] is not belong to current project",
+ "提交隔离任务异常, 关联的工作流实例:[{0}]不属于当前项目"),
+ ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST(1500103, "Submit isolation task error, task: [{0}] is not exist",
+ "提交隔离任务异常, 任务不存在:[{0}]"),
+ ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK(1500104,
+ "Submit isolation task error, workflow instance: [{0}] exist an sub isolation task",
+ "提交隔离任务异常, 工作流实例已经存在:[{0}]子隔离任务"),
+ ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500105,
+ "Submit isolation task error, send request to master error",
+ "提交隔离任务异常,发送请求给Master异常"),
+
+ ISOLATION_TASK_ONLINE_ERROR(1500200, "Online isolation task error", "上线隔离任务异常"),
+ ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE(1500201,
+ "Online isolation task error, the isolation task is already online", "上线隔离任务异常,该任务已处于隔离中"),
+ ISOLATION_TASK_ONLINE_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500202,
+ "Online isolation task error, send request to master error", "上线隔离任务异常,发送隔离请求给Master异常"),
+ ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500203,
+ "Online isolation task error, the process instance is not belongs to project", "上线隔离任务异常,当前工作流实例不属于目标任务"),
+
+ ISOLATION_TASK_CANCEL_ERROR(1500300, "Cancel isolation task error", "取消隔离任务异常"),
+ ISOLATION_TASK_CANCEL_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500301,
+ "Cancel isolation task error, send request to master error", "取消隔离任务异常,发送隔离请求给Master异常"),
+ ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL(1500302,
+ "Cancel isolation task error, this task isolation is already been cancel", "取消隔离任务异常,该隔离已经下线"),
+ ISOLATION_TASK_CANCEL_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500303,
+ "Cancel isolation task error, the process instance is not belongs to project", "取消隔离任务异常,当前工作流实例不属于目标任务"),
+ ISOLATION_TASK_LISTING_ERROR(1500400, "Listing isolation task error", "查询隔离任务列表异常"),
+ ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"),
+ ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline",
+ "删除隔离任务异常,该隔离任务尚未下线"),
+ ;
private final int code;
private final String enMsg;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
new file mode 100644
index 0000000000..ef3634cd52
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java
@@ -0,0 +1,35 @@
+package org.apache.dolphinscheduler.api.remote;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ApiServerRPCClient {
+
+ private final NettyRemotingClient client;
+
+ private static final long DEFAULT_TIME_OUT_MILLS = 1_000L;
+
+ public ApiServerRPCClient() {
+ client = new NettyRemotingClient(new NettyClientConfig());
+ log.info("Success initialized ApiServerRPCClient...");
+ }
+
+ public void sendSyncCommand(@NonNull Host host,
+ @NonNull Command rpcCommand) throws RemotingException, InterruptedException {
+ sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS);
+ }
+
+ public void sendSyncCommand(@NonNull Host host,
+ @NonNull Command rpcCommand,
+ long timeoutMills) throws RemotingException, InterruptedException {
+ client.sendSync(host, rpcCommand, timeoutMills);
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
new file mode 100644
index 0000000000..7225d19007
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java
@@ -0,0 +1,32 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+public interface IsolationTaskService {
+
+ void submitTaskIsolations(@NonNull User loginUser,
+ long projectCode,
+ @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest);
+
+ void onlineTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ long isolationTaskId);
+
+ void cancelTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ long isolationId);
+
+ PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ @NonNull IsolationTaskListingRequest request);
+
+ void deleteTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ long id);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 58baef26a6..f3a9069a72 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -18,16 +18,16 @@
package org.apache.dolphinscheduler.api.service;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
/**
* process instance service
*/
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
index 716e2fba87..64fcf8fd99 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
+import lombok.NonNull;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -44,6 +45,7 @@ public interface ProjectService {
* @param projectCode project code
* @return project detail information
*/
+ @NonNull
Project queryByCode(User loginUser, long projectCode);
/**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java
new file mode 100644
index 0000000000..739c11a616
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.api.service;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+public interface WorkflowDAGService {
+
+ DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
+ @NonNull Integer processDefinitionVersion);
+
+ DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog);
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
new file mode 100644
index 0000000000..4f6ff908b1
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java
@@ -0,0 +1,246 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.api.checker.IsolationTaskChecker;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
+import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient;
+import org.apache.dolphinscheduler.api.service.IsolationTaskService;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.api.transformer.CommandTransformer;
+import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR;
+
+@Slf4j
+@Service
+public class IsolationTaskServiceImpl implements IsolationTaskService {
+
+ @Autowired
+ private IsolationTaskDao isolationTaskDao;
+
+ @Autowired
+ private IsolationTaskChecker isolationTaskChecker;
+
+ @Autowired
+ private ProcessInstanceDao processInstanceDao;
+
+ @Autowired
+ private CommandTransformer commandTransformer;
+
+ @Autowired
+ private ApiServerRPCClient apiServerRPCClient;
+
+ @Autowired
+ private RegistryClient registryClient;
+
+ @Autowired
+ private ProcessService processService;
+
+ @Autowired
+ private WorkflowDAGService workflowDAGService;
+
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
+ @Override
+ @Transactional
+ public void submitTaskIsolations(@NonNull User loginUser,
+ long projectCode,
+ @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
+ Map<Integer, List<IsolationTaskExcelParseVO>> workflow2VoMap =
+ isolationTaskSubmitRequest.getIsolationTaskExcelParseVOList()
+ .stream().collect(
+ HashMap::new, (map, vo) -> {
+ map.computeIfAbsent(vo.getWorkflowInstanceId(), k -> new ArrayList<>()).add(vo);
+ },
+ Map::putAll);
+
+ List<Pair<ProcessInstance, List<IsolationTask>>> needToOnlineIsolations = new ArrayList<>();
+ List<IsolationTask> needToInsertIntoDB = new ArrayList<>();
+ for (Map.Entry<Integer, List<IsolationTaskExcelParseVO>> entry : workflow2VoMap.entrySet()) {
+ Integer workflowInstanceId = entry.getKey();
+ List<IsolationTaskExcelParseVO> vos = entry.getValue();
+ ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+ isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos);
+
+ List<IsolationTask> isolationTasks = entry.getValue().stream().map(vo -> {
+ return IsolationTask.builder()
+ .workflowInstanceId(vo.getWorkflowInstanceId())
+ .workflowInstanceName(vo.getWorkflowInstanceName())
+ .taskName(vo.getTaskName())
+ .taskCode(vo.getTaskCode())
+ .status(IsolationTaskStatus.ONLINE.getCode())
+ .build();
+ }).collect(Collectors.toList());
+ needToInsertIntoDB.addAll(isolationTasks);
+ needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks));
+ }
+ isolationTaskDao.batchInsert(needToInsertIntoDB);
+ // we split here to avoid rollback RPC request
+ try {
+ refreshIsolationTasks();
+ } catch (Exception ex) {
+ throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR);
+ }
+ }
+
+ @Override
+ @Transactional
+ public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) {
+ IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId)
+ .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+ ProcessInstance processInstance =
+ processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId());
+
+ isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
+ isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE);
+ try {
+ refreshIsolationTasks();
+ } catch (Exception ex) {
+ throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR);
+ }
+ }
+
+ @Override
+ @Transactional
+ public void cancelTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ long isolationId) {
+ IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
+ .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
+
+ Integer workflowInstanceId = isolationTask.getWorkflowInstanceId();
+ ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+ isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
+ isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE);
+ insertRecoveryCommandIfNeeded(processInstance, isolationTask);
+ try {
+ refreshIsolationTasks();
+ } catch (RemotingException | InterruptedException e) {
+ throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR);
+ }
+ }
+
+ @Override
+ public PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
+ long projectCode,
+ @NonNull IsolationTaskListingRequest request) {
+ isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode);
+
+ Integer pageNo = request.getPageNo();
+ Integer pageSize = request.getPageSize();
+
+ IPage<IsolationTask> iPage = isolationTaskDao.pageQueryIsolationTask(
+ request.getWorkflowInstanceName(),
+ request.getTaskName(),
+ pageNo,
+ pageSize);
+
+ PageInfo<IsolationTask> pageInfo = new PageInfo<>(pageNo, pageSize);
+ pageInfo.setTotal((int) iPage.getTotal());
+ pageInfo.setTotalList(iPage.getRecords());
+ return pageInfo;
+ }
+
+ @Override
+ public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) {
+ isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id);
+ int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE);
+ if (deleteNum <= 0) {
+ throw new ServiceException(ISOLATION_TASK_NOT_EXIST);
+ }
+ }
+
+ private void refreshIsolationTasks() throws RemotingException, InterruptedException {
+ List<Server> masters = registryClient.getServerList(NodeType.MASTER);
+ if (CollectionUtils.isEmpty(masters)) {
+ return;
+ }
+
+ org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest =
+ new RefreshIsolationTaskRequest().convert2Command();
+ for (Server master : masters) {
+ try {
+ apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
+ refreshIsolationRequest);
+ } catch (RemotingException | InterruptedException e) {
+ log.error("Send RefreshIsolationTask request to master error, master: {}", master, e);
+ throw e;
+ }
+ }
+ }
+
+ private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance,
+ @NonNull IsolationTask isolationTask) {
+ if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) {
+ return;
+ }
+ int workflowInstanceId = processInstance.getId();
+ // find the isolationTaskInstanceIds need to recovery
+ // find the sub node is in pause or kill
+ DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(
+ processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
+
+ List<TaskInstance> taskInstances =
+ taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId);
+ Set<String> onlineIsolationTaskCodes =
+ isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE)
+ .stream()
+ .map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode()))
+ .collect(Collectors.toSet());
+
+ List<TaskInstance> canRecoveryTaskInstances = taskInstances.stream()
+ .filter(taskInstance -> taskInstance.getState().typeIsIsolated())
+ .filter(taskInstance -> !DagHelper.isChildOfAnyParentNodes(String.valueOf(taskInstance.getTaskCode()),
+ onlineIsolationTaskCodes, workflowDAG))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) {
+ return;
+ }
+ // find if this taskInstance still exist pre isolationTasks
+ Command command = commandTransformer.transformToRecoveryFromTaskIsolationCommand(processInstance,
+ canRecoveryTaskInstances);
+ processService.createCommand(command);
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7f5bdf086b..3720956c45 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
@@ -168,7 +169,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
* @return project detail information
*/
@Override
- public Project queryByCode(User loginUser, long projectCode) {
+ public @NonNull Project queryByCode(User loginUser, long projectCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
hasProjectAndPerm(loginUser, project, result, PROJECT);
@@ -204,10 +205,8 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
perm)) {
// check read permission
Project checkProject = projectMapper.queryByCode(projectCode);
- putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
+ throw new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
Objects.nonNull(checkProject) ? project.getName() : projectCode);
- } else {
- putMsg(result, Status.SUCCESS);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index f6da23ac0f..9d1dc166a2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
@@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -41,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -89,6 +92,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
private TaskInstanceDao taskInstanceDao;
+ @Autowired
+ private ProcessInstanceDao processInstanceDao;
+
+ @Autowired
+ private WorkflowInstanceChecker workflowInstanceChecker;
+
@Autowired
private CommandTransformer commandTransformer;
@@ -219,7 +228,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
List<TaskInstance> existTaskInstances = taskInstanceDao.queryTaskInstanceByIds(taskInstanceIds);
TaskInstanceValidator.validateTaskInstanceAllExists(taskInstanceIds, existTaskInstances);
- Map<Integer, List<Integer>> processInstanceId2TaskInstanceIds =
+ Map<Integer, List<Integer>> processInstanceId2TaskInstanceIdMap =
existTaskInstances.stream().collect(
HashMap::new,
(map, taskInstance) -> {
@@ -228,11 +237,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
},
Map::putAll);
- List<Command> cleanStateCommands = commandTransformer.transformToCleanTaskInstanceStateCommands(loginUser,
- processInstanceId2TaskInstanceIds);
-
// todo: use batch create and remove the transactional
- cleanStateCommands.forEach(command -> processService.createCommand(command));
+ for (Map.Entry<Integer, List<Integer>> processInstanceId2TaskInstanceIds : processInstanceId2TaskInstanceIdMap
+ .entrySet()) {
+ Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey();
+ List<Integer> needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue();
+ ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
+ workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance);
+ Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance,
+ needToCleanStateTaskInstanceIds);
+ processService.createCommand(command);
+ }
} catch (ServiceException serviceException) {
throw serviceException;
} catch (Exception ex) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java
new file mode 100644
index 0000000000..f1ffb0f64d
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java
@@ -0,0 +1,64 @@
+package org.apache.dolphinscheduler.api.service.impl;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
+
+@Service
+public class WorkflowDagServiceImpl implements WorkflowDAGService {
+
+ @Autowired
+ private ProcessDefinitionLogDao processDefinitionLogDao;
+
+ // todo: use dao
+ @Autowired
+ private ProcessService processService;
+
+ @Override
+ public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
+ @NonNull Integer processDefinitionVersion) {
+ ProcessDefinitionLog processDefinitionLog =
+ processDefinitionLogDao.queryProcessDefinitionByCode(processDefinitionCode, processDefinitionVersion)
+ .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processDefinitionCode));
+ return getWorkflowDAG(processDefinitionLog);
+ }
+
+ @Override
+ public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog) {
+ List<ProcessTaskRelation> processTaskRelations =
+ processService.findRelationByCode(processDefinitionLog.getCode(), processDefinitionLog.getVersion());
+ List<TaskDefinitionLog> taskDefinitionLogs =
+ processService.getTaskDefineLogListByRelation(processTaskRelations);
+ List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
+
+ ProcessDag processDag = DagHelper.generateFlowDag(
+ taskNodeList,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ TaskDependType.TASK_POST);
+ if (processDag == null) {
+ throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
+ }
+
+ return DagHelper.buildDagGraph(processDag);
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
index 51865effaf..986c2b99a6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java
@@ -2,17 +2,16 @@ package org.apache.dolphinscheduler.api.transformer;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.List;
-import java.util.Map;
public interface CommandTransformer {
- List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
- @NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds);
+ Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
+ @NonNull List<Integer> needCleanTaskInstanceIds);
- Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
- @NonNull Integer workflowInstanceId,
- @NonNull List<Integer> needCleanTaskInstances);
+ Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
+ @NonNull List<TaskInstance> canRecoveryTaskInstances);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
index 2a0e46033e..9d8aadff43 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java
@@ -1,53 +1,59 @@
package org.apache.dolphinscheduler.api.transformer;
import lombok.NonNull;
-import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.springframework.stereotype.Component;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS;
@Component
public class CommandTransformerImpl implements CommandTransformer {
- @Autowired
- private ProcessInstanceDao processInstanceDao;
+ @Override
+ public Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
+ @NonNull List<Integer> needCleanTaskInstanceIds) {
- @Autowired
- private WorkflowInstanceChecker workflowInstanceChecker;
+ Command command = new Command();
+ command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
+ command.setExecutorId(processInstance.getExecutorId());
+ command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+ command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+ // we set the task post strategy, since we will go through the post task if the post task is not running
+ command.setTaskDependType(TaskDependType.TASK_POST);
+ command.setFailureStrategy(processInstance.getFailureStrategy());
+ command.setWarningType(processInstance.getWarningType());
+ command.setWarningGroupId(processInstance.getWarningGroupId());
+ command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+ command.setWorkerGroup(processInstance.getWorkerGroup());
+ command.setEnvironmentCode(processInstance.getEnvironmentCode());
+ command.setDryRun(processInstance.getDryRun());
- @Override
- public List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
- @NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds) {
- List<Command> commands = new ArrayList<>(workflowInstanceId2TaskInstanceIds.size());
- workflowInstanceId2TaskInstanceIds.forEach((workflowInstanceId, taskInstanceIds) -> {
- Command command = transformToCleanTaskInstanceStateCommand(loginUser, workflowInstanceId, taskInstanceIds);
- commands.add(command);
- });
- return commands;
+ // todo:use pojo to represent CommandParam rather than map
+ Map<String, String> commandParamMap = new HashMap<>();
+ commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstanceIds));
+ command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
+ command.setProcessInstanceId(processInstance.getId());
+ return command;
}
@Override
- public Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
- @NonNull Integer workflowInstanceId,
- @NonNull List<Integer> needCleanTaskInstances) {
- ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
- workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, processInstance);
-
+ public Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
+ @NonNull List<TaskInstance> canRecoveryIsolationTaskInstances) {
Command command = new Command();
- command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
+ command.setCommandType(CommandType.RECOVERY_FROM_ISOLATION_TASKS);
+
command.setExecutorId(processInstance.getExecutorId());
command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
@@ -62,10 +68,22 @@ public class CommandTransformerImpl implements CommandTransformer {
command.setDryRun(processInstance.getDryRun());
// todo:use pojo to represent CommandParam rather than map
+ List<Integer> recoveryPausedIsolationIds = canRecoveryIsolationTaskInstances.stream()
+ .filter(taskInstance -> taskInstance.getState().typeIsPauseByIsolation())
+ .map(TaskInstance::getId)
+ .collect(Collectors.toList());
+ List<Integer> recoveryKilledIsolationIds = canRecoveryIsolationTaskInstances.stream()
+ .filter(taskInstance -> taskInstance.getState().typeIsKilledByIsolation())
+ .map(TaskInstance::getId)
+ .collect(Collectors.toList());
Map<String, String> commandParamMap = new HashMap<>();
- commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstances));
+ commandParamMap.put(CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS,
+ JSONUtils.toJsonString(recoveryPausedIsolationIds));
+ commandParamMap.put(CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS,
+ JSONUtils.toJsonString(recoveryKilledIsolationIds));
command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
command.setProcessInstanceId(processInstance.getId());
return command;
}
+
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java
new file mode 100644
index 0000000000..be63400aed
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.api.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IsolationTaskExcelParseVO {
+
+ @NotNull(message = "Workflow instance id cannot be null")
+ private Integer workflowInstanceId;
+
+ @NotNull(message = "Workflow instance name cannot be null")
+ private String workflowInstanceName;
+
+ @NotNull(message = "Task code cannot be null")
+ private Long taskCode;
+
+ @NotNull(message = "Task name cannot be null")
+ private String taskName;
+}
diff --git a/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx
new file mode 100644
index 0000000000..f334cc6e5e
Binary files /dev/null and b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx differ
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 0499d43c30..efffbc3842 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -338,6 +338,9 @@ public final class Constants {
public static final String CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS = "CleanStateTaskInstanceIds";
+ public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds";
+ public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds";
+
public static final String CMD_PARAM_START_PARAMS = "StartParams";
public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
@@ -859,5 +862,5 @@ public final class Constants {
* file upload verify
*/
public static final String FILE_TYPE_RESTRICTED_LIST = "file.type.restricted.list";
- public static final String FILE_NAME_RESTRICTED_CONTENT= "file.name.restricted.content";
+ public static final String FILE_NAME_RESTRICTED_CONTENT = "file.name.restricted.content";
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index f9bc11fee3..62d538a58b 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -55,6 +55,8 @@ public enum CommandType {
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
START_FROM_STATE_CLEAN_TASKS(12, "start from state clean tasks"),
+ RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"),
+
;
CommandType(int code, String descp) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
new file mode 100644
index 0000000000..dcc93cc64b
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.dao.dto;
+
+public enum IsolationTaskStatus {
+
+ ONLINE(0),
+ OFFLINE(1),
+ ;
+
+ private final int code;
+
+ IsolationTaskStatus(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public static IsolationTaskStatus of(int code) {
+ for (IsolationTaskStatus value : IsolationTaskStatus.values()) {
+ if (value.getCode() == code) {
+ return value;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code));
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
new file mode 100644
index 0000000000..8a852aace5
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java
@@ -0,0 +1,37 @@
+package org.apache.dolphinscheduler.dao.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("t_ds_isolation_task")
+public class IsolationTask {
+
+ @TableId(value = "id", type = IdType.AUTO)
+ private Long id;
+
+ private int workflowInstanceId;
+
+ private String workflowInstanceName;
+
+ private String taskName;
+
+ private long taskCode;
+
+ private int status;
+
+ private Date createTime;
+
+ private Date updateTime;
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
new file mode 100644
index 0000000000..babb00c5bb
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java
@@ -0,0 +1,32 @@
+package org.apache.dolphinscheduler.dao.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
+
+ IPage<IsolationTask> pageQuery(@Param("workflowInstanceName") String workflowInstanceName,
+ @Param("taskName") String taskName,
+ IPage<IsolationTask> page);
+
+ List<IsolationTask> queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId,
+ @NonNull @Param("taskCodes") List<Long> taskCodes);
+
+ void updateIsolationTaskStatus(@Param("id") long isolationTaskId,
+ @Param("status") int status);
+
+ int deleteByIdAndStatus(@Param("id") long isolationTaskId,
+ @Param("status") int status);
+
+ List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId,
+ @Param("status") int status);
+
+ void batchInsert(@Param("isolationTasks") List<IsolationTask> isolationTasks);
+
+ List<IsolationTask> queryByStatus(@Param("status") int code);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
new file mode 100644
index 0000000000..de16a92be0
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java
@@ -0,0 +1,39 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public interface IsolationTaskDao {
+
+ /**
+ * Page query, pageNumber is start from 0.
+ */
+ IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
+ String taskName,
+ int pageNumber,
+ int pageSize);
+
+ List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes);
+
+ List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus);
+
+ Optional<IsolationTask> queryById(long isolationTaskId);
+
+ List<IsolationTask> queryByIds(List<Long> isolationTaskIds);
+
+ List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus);
+
+ int deleteByIdAndStatus(long id, IsolationTaskStatus status);
+
+ void insert(IsolationTask isolationTaskDTO);
+
+ void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus);
+
+ void batchInsert(List<IsolationTask> isolationTasks);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
new file mode 100644
index 0000000000..a8eba87989
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+import java.util.Optional;
+
+public interface ProcessDefinitionLogDao {
+
+ Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
+ @NonNull Integer processDefinitionVersion);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index f218510191..e789101783 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -38,4 +38,5 @@ public interface ProcessInstanceDao {
* @param processInstance processInstance
*/
int upsertProcessInstance(ProcessInstance processInstance);
+
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
new file mode 100644
index 0000000000..6a606e3aef
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
@@ -0,0 +1,11 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+
+import java.util.List;
+
+public interface ProcessTaskRelationLogDao {
+
+ List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(long workflowDefinitionCode,
+ int workflowDefinitionVersion);
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
new file mode 100644
index 0000000000..5c8d55b005
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+
+import java.util.List;
+
+public interface TaskDefinitionLogDao {
+
+ List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index c5b75fb0d9..3918ad8386 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -10,6 +10,8 @@ public interface TaskInstanceDao {
List<TaskInstance> queryTaskInstanceByIds(List<Integer> taskInstanceId);
+ List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId);
+
/**
* Update the taskInstance, if update failed will throw exception.
*
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
new file mode 100644
index 0000000000..effab3aa16
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java
@@ -0,0 +1,86 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+@Slf4j
+@Repository
+public class IsolationTaskDaoImpl implements IsolationTaskDao {
+
+ @Autowired
+ private IsolationTaskMapper isolationTaskMapper;
+
+ @Override
+ public IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
+ String taskName,
+ int pageNumber,
+ int pageSize) {
+ Page<IsolationTask> page = new Page<>(pageNumber, pageSize);
+ return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page);
+ }
+
+ @Override
+ public List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes) {
+ if (CollectionUtils.isEmpty(taskCodes)) {
+ return Collections.emptyList();
+ }
+ return isolationTaskMapper.queryByTaskCodes(workflowInstanceId, taskCodes);
+ }
+
+ @Override
+ public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId,
+ IsolationTaskStatus isolationTaskStatus) {
+ return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode());
+ }
+
+ @Override
+ public Optional<IsolationTask> queryById(long isolationTaskId) {
+ return Optional.ofNullable(isolationTaskMapper.selectById(isolationTaskId));
+ }
+
+ @Override
+ public List<IsolationTask> queryByIds(List<Long> isolationTaskIds) {
+ return isolationTaskMapper.selectBatchIds(isolationTaskIds);
+ }
+
+ @Override
+ public List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) {
+ return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode());
+ }
+
+ @Override
+ public int deleteByIdAndStatus(long id, IsolationTaskStatus status) {
+ return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode());
+ }
+
+ @Override
+ public void insert(IsolationTask isolationTask) {
+ isolationTaskMapper.insert(isolationTask);
+ }
+
+ @Override
+ public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) {
+ isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode());
+ }
+
+ @Override
+ public void batchInsert(List<IsolationTask> isolationTasks) {
+ if (CollectionUtils.isEmpty(isolationTasks)) {
+ return;
+ }
+ isolationTaskMapper.batchInsert(isolationTasks);
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..1c77aea33f
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+@Repository
+public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
+
+ @Autowired
+ private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
+ @Override
+ public Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
+ @NonNull Integer processDefinitionVersion) {
+ return Optional.ofNullable(
+ processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode,
+ processDefinitionVersion));
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
new file mode 100644
index 0000000000..9cdf6c7de6
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao {
+
+ @Autowired
+ private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+ @Override
+ public List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(
+ long workflowDefinitionCode,
+ int workflowDefinitionVersion) {
+ return processTaskRelationLogMapper.queryByProcessCodeAndVersion(workflowDefinitionCode,
+ workflowDefinitionVersion);
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..205fc3b649
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
@@ -0,0 +1,44 @@
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Repository
+public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
+
+ @Autowired
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+ @Override
+ public List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations) {
+ List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+ if (CollectionUtils.isEmpty(processTaskRelations)) {
+ return taskDefinitionLogs;
+ }
+ Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (processTaskRelation.getPreTaskCode() > 0) {
+ taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
+ }
+ if (processTaskRelation.getPostTaskCode() > 0) {
+ taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
+ }
+ }
+
+ taskCodeVersionMap.forEach((code, version) -> {
+ taskDefinitionLogs.add(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version));
+ });
+ return taskDefinitionLogs;
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 510b5d7a71..0512980b10 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.exception.RepositoryException;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@@ -28,6 +29,11 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
return taskInstanceMapper.selectBatchIds(taskInstanceIds);
}
+ @Override
+ public List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) {
+ return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES);
+ }
+
@Override
public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException {
try {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index 3f25bb0f49..3ab01171e1 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.dao.utils;
+import com.google.common.collect.Sets;
import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -30,9 +32,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.commons.collections.CollectionUtils;
-
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -41,12 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
/**
* dag tools
*/
@@ -591,4 +589,89 @@ public class DagHelper {
}
return false;
}
+
+ /**
+ * Get all post nodes of the given node.
+ * <p>
+ * e.g. the given DAG is A -> B -> C, and taskNode is A, return result is A,B,C
+ */
+ public static Set<String> getAllPostNodes(@NonNull String taskNode,
+ @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+ Set<String> result = Sets.newHashSet();
+ Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
+ Set<String> visitedNodes = new HashSet<>();
+ while (!currentLoopNodes.isEmpty()) {
+ Set<String> tmp = new HashSet<>();
+ for (String currentNode : currentLoopNodes) {
+ result.add(currentNode);
+ Set<String> subsequentNodes = workflowDAG.getSubsequentNodes(currentNode);
+ visitedNodes.add(currentNode);
+ subsequentNodes.forEach(s -> {
+ if (!visitedNodes.contains(s)) {
+ tmp.add(s);
+ }
+ });
+ }
+ currentLoopNodes = tmp;
+ }
+ return result;
+ }
+
+ /**
+ * Get all pre nodes of the given node.
+ * <p>
+ * e.g. the given DAG is A -> B -> C, and taskNode is B, return result is A,B
+ */
+ public static Set<String> getAllPreNodes(@NonNull String taskNode,
+ @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+ Set<String> result = Sets.newHashSet();
+ Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
+ Set<String> visitedNodes = new HashSet<>();
+ while (CollectionUtils.isNotEmpty(currentLoopNodes)) {
+ Set<String> tmp = new HashSet<>();
+ for (String currentNode : currentLoopNodes) {
+ result.add(currentNode);
+ visitedNodes.add(currentNode);
+ workflowDAG.getPreviousNodes(currentNode).forEach(s -> {
+ if (!visitedNodes.contains(s)) {
+ tmp.add(s);
+ }
+ });
+ }
+ currentLoopNodes = tmp;
+ }
+ return result;
+ }
+
+ /**
+ * Judge if the givenTaskNode is the child of any given parentNodes or is belongs to parentNodes.
+ */
+ public static boolean isChildOfAnyParentNodes(@NonNull String givenTaskNode,
+ @NonNull Set<String> parentNodes,
+ @NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
+ if (CollectionUtils.isEmpty(parentNodes)) {
+ return false;
+ }
+
+ if (parentNodes.contains(givenTaskNode)) {
+ return true;
+ }
+
+ Set<String> postNodesOfParent = new HashSet<>();
+ Set<String> visited = new HashSet<>();
+ while (CollectionUtils.isNotEmpty(parentNodes)) {
+ Set<String> tmp = new HashSet<>();
+ for (String parentNode : parentNodes) {
+ postNodesOfParent.add(parentNode);
+ visited.add(parentNode);
+ workflowDAG.getSubsequentNodes(parentNode).forEach(node -> {
+ if (!visited.contains(node)) {
+ tmp.add(node);
+ }
+ });
+ }
+ parentNodes = tmp;
+ }
+ return postNodesOfParent.contains(givenTaskNode);
+ }
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
new file mode 100644
index 0000000000..4a7100010e
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper">
+ <sql id="baseSql">
+ id
+ ,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time
+ </sql>
+
+ <select id="pageQuery" resultType="string">
+ select
+ <include refid="baseSql"/>
+ from t_ds_isolation_task
+ where 1 = 1
+ <if test="workflowInstanceName != null">
+ and workflow_instance_name = #{workflowInstanceName}
+ </if>
+ <if test="taskName != null">
+ and task_name = #{taskName}
+ </if>
+ </select>
+
+ <select id="queryByTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+ select
+ <include refid="baseSql"/>
+ from t_ds_isolation_task
+ where workflow_instance_id = #{workflowInstanceId}
+ and task_code in
+ <foreach collection="taskCodes" item="taskCode" open="(" separator="," close=")">
+ #{taskCode}
+ </foreach>
+ </select>
+
+ <select id="queryByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+ select
+ <include refid="baseSql"/>
+ from t_ds_isolation_task
+ where workflow_instance_id = #{workflowInstanceId} and status = #{status}
+ </select>
+
+ <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
+ select
+ <include refid="baseSql"/>
+ from t_ds_isolation_task
+ where status = #{status}
+ </select>
+
+ <update id="updateIsolationTaskStatus">
+ update t_ds_isolation_task
+ set status = #{status}
+ where id = #{id}
+ </update>
+
+ <insert id="batchInsert">
+ insert into t_ds_isolation_task (
+ workflow_instance_id,
+ workflow_instance_name,
+ task_name,
+ task_code,
+ status
+ )
+ values
+ <foreach collection="isolationTasks" item="isolationTask" separator=",">
+ (
+ #{isolationTask.workflowInstanceId},
+ #{isolationTask.workflowInstanceName},
+ #{isolationTask.taskName},
+ #{isolationTask.taskCode},
+ #{isolationTask.status}
+ )
+ </foreach>
+ </insert>
+
+ <delete id="deleteByIdAndStatus">
+ delete
+ from t_ds_isolation_task
+ where id = #{id}
+ and status = #{status}
+ </delete>
+
+</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 2422318c10..8f6df6738c 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -1977,3 +1977,20 @@ CREATE TABLE t_ds_fav
user_id int NOT NULL,
PRIMARY KEY (id)
);
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+ `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+ `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+ `task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
+ `task_code` bigint NOT NULL COMMENT 'isolation task code',
+ `status` tinyint(4) DEFAULT '0',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+ `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`)
+);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 6df6a9b0fe..e5b69db827 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -1896,22 +1896,22 @@ CREATE TABLE `t_ds_k8s` (
-- Table structure for t_ds_k8s_namespace
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_k8s_namespace`;
-CREATE TABLE `t_ds_k8s_namespace` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `limits_memory` int(11) DEFAULT NULL,
- `namespace` varchar(100) DEFAULT NULL,
- `online_job_num` int(11) DEFAULT NULL,
- `owner` varchar(100) DEFAULT NULL,
- `pod_replicas` int(11) DEFAULT NULL,
- `pod_request_cpu` decimal(14,3) DEFAULT NULL,
- `pod_request_memory` int(11) DEFAULT NULL,
- `tag` varchar(100) DEFAULT NULL,
- `limits_cpu` decimal(14,3) DEFAULT NULL,
- `k8s` varchar(100) DEFAULT NULL,
- `create_time` datetime DEFAULT NULL COMMENT 'create time',
- `update_time` datetime DEFAULT NULL COMMENT 'update time',
- PRIMARY KEY (`id`),
- UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
+CREATE TABLE `t_ds_k8s_namespace`(
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `limits_memory` int(11) DEFAULT NULL,
+ `namespace` varchar(100) DEFAULT NULL,
+ `online_job_num` int(11) DEFAULT NULL,
+ `owner` varchar(100) DEFAULT NULL,
+ `pod_replicas` int(11) DEFAULT NULL,
+ `pod_request_cpu` decimal(14, 3) DEFAULT NULL,
+ `pod_request_memory` int(11) DEFAULT NULL,
+ `tag` varchar(100) DEFAULT NULL,
+ `limits_cpu` decimal(14, 3) DEFAULT NULL,
+ `k8s` varchar(100) DEFAULT NULL,
+ `create_time` datetime DEFAULT NULL COMMENT 'create time',
+ `update_time` datetime DEFAULT NULL COMMENT 'update time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
@@ -1919,21 +1919,22 @@ CREATE TABLE `t_ds_k8s_namespace` (
-- Table structure for t_ds_alert_send_status
-- ----------------------------
DROP TABLE IF EXISTS t_ds_alert_send_status;
-CREATE TABLE t_ds_alert_send_status(
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `alert_id` int(11) NOT NULL,
- `alert_plugin_instance_id` int(11) NOT NULL,
- `send_status` tinyint(4) DEFAULT '0',
- `log` text,
- `create_time` datetime DEFAULT NULL COMMENT 'create time',
- PRIMARY KEY (`id`),
- UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
-) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
+DROP TABLE IF EXISTS `t_ds_fav_task`;
-- ----------------------------
-- Table structure for t_ds_fav_task
-- ----------------------------
-DROP TABLE IF EXISTS `t_ds_fav_task`;
+CREATE TABLE t_ds_alert_send_status
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `alert_id` int(11) NOT NULL,
+ `alert_plugin_instance_id` int(11) NOT NULL,
+ `send_status` tinyint(4) DEFAULT '0',
+ `log` text,
+ `create_time` datetime DEFAULT NULL COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE TABLE `t_ds_fav_task`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'favorite task id',
@@ -1943,3 +1944,21 @@ CREATE TABLE `t_ds_fav_task`
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+ `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+ `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+ `task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
+ `task_code` bigint NOT NULL COMMENT 'isolation task code',
+ `status` tinyint(4) DEFAULT '0',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+ `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`),
+ KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE
+) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 15bf31236a..f9bb5587cc 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -1941,3 +1941,21 @@ CREATE TABLE t_ds_fav
user_id int NOT NULL,
PRIMARY KEY (id)
);
+
+-- ----------------------------
+-- Table structure for t_ds_isolation_task
+-- ----------------------------
+DROP TABLE if exists `t_ds_isolation_task`;
+CREATE TABLE `t_ds_isolation_task`
+(
+ `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
+ `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
+ `task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
+ `task_code` bigint NOT NULL COMMENT 'isolation task code',
+ `status` tinyint(4) DEFAULT '0',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+ `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`),
+ CONSTRAINT `workflow_instance_index` (`workflow_instance_id`)
+);
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index a80660b549..de1890f150 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.dao.utils;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Sets;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
+import org.junit.Assert;
+import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,10 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
/**
* dag helper test
@@ -335,9 +334,8 @@ public class DagHelperTest {
* 2->8->5->7
*
* @return dag
- * @throws JsonProcessingException if error throws JsonProcessingException
*/
- private DAG<String, TaskNode, TaskNodeRelation> generateDag() throws IOException {
+ private DAG<String, TaskNode, TaskNodeRelation> generateDag() {
List<TaskNode> taskNodeList = new ArrayList<>();
TaskNode node1 = new TaskNode();
node1.setId("1");
@@ -529,4 +527,23 @@ public class DagHelperTest {
Assert.assertNotNull(dag);
}
+ @Test
+ public void getAllPostNodes() {
+ DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+ Set<String> allPostNodes = DagHelper.getAllPostNodes("2", dag);
+ Assert.assertEquals(Sets.newHashSet("2", "3", "5", "6", "7", "8"), allPostNodes);
+ }
+
+ @Test
+ public void getAllPreNodes() {
+ DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+ Set<String> allPostNodes = DagHelper.getAllPreNodes("2", dag);
+ Assert.assertEquals(Sets.newHashSet("1", "2"), allPostNodes);
+ }
+
+ @Test
+ public void isChildOfAnyParentNodes() {
+ DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
+ Assert.assertTrue(DagHelper.isChildOfAnyParentNodes("2", Sets.newHashSet("1"), dag));
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
new file mode 100644
index 0000000000..826a456fd8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java
@@ -0,0 +1,26 @@
+package org.apache.dolphinscheduler.server.master.processor.isolation;
+
+import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
+
+ @Autowired
+ private IsolationTaskManager isolationTaskManager;
+
+ @Override
+ public void process(Channel channel, Command command) {
+ if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
+ throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
+ }
+ isolationTaskManager.refreshIsolationTaskMapFromDB();
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index c2eb200059..eb4d82cd3d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingData
import javax.annotation.PostConstruct;
+import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -74,6 +76,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
+ @Autowired
+ private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor;
+
public void start() {
logger.info("Starting Master RPC Server...");
// init remoting server
@@ -96,6 +101,9 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST,
+ refreshIsolationTaskProcessor);
+
this.nettyRemotingServer.start();
logger.info("Started Master RPC Server...");
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 5546b474d7..6446248c06 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
@Autowired
private MasterFailoverService masterFailoverService;
+ @Autowired
+ private IsolationTaskManager isolationTaskManager;
+
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
}
@@ -66,6 +70,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
masterFailoverService.checkMasterFailover();
+ isolationTaskManager.refreshIsolationTaskMapFromDB();
} catch (Exception e) {
logger.error("Master failover thread execute error", e);
} finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 24982437a2..1d823a71a7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -108,6 +109,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private StateEventCallbackService stateEventCallbackService;
+ @Autowired
+ private IsolationTaskDao isolationTaskDao;
+
private String masterAddress;
protected MasterSchedulerBootstrap() {
@@ -184,7 +188,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
processAlertManager,
masterConfig,
stateWheelExecuteThread,
- curingGlobalParamsService);
+ curingGlobalParamsService,
+ isolationTaskDao);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index d05bdf8015..e102238178 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -37,8 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -47,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -188,6 +191,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final Set<String> stateCleanTaskCodes = new HashSet<>();
+ // isolationTaskCode -> isolatedTimes
+ private final Map<Long, Integer> isolationTaskCodesToTimesMap = new HashMap<>();
+
/**
* state event queue
*/
@@ -208,6 +214,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final CuringParamsService curingParamsService;
+ private final IsolationTaskDao isolationTaskDao;
+
private final String masterAddress;
/**
@@ -227,7 +235,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
@NonNull ProcessAlertManager processAlertManager,
@NonNull MasterConfig masterConfig,
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
- @NonNull CuringParamsService curingParamsService) {
+ @NonNull CuringParamsService curingParamsService,
+ @NonNull IsolationTaskDao isolationTaskDao) {
this.processService = processService;
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
@@ -235,6 +244,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
+ this.isolationTaskDao = isolationTaskDao;
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
@@ -435,6 +445,84 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
+ public void onlineTaskIsolation(long taskCode) {
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ // find the post running task
+ Set<String> needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+ // if the current task is finished, kill the post task
+ // we need to submit an online isolation task event, otherwise there may exist concurrent problem
+ for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) {
+ Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr);
+ isolationTaskCodesToTimesMap.put(isolationTaskCode,
+ isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1);
+ ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
+ if (iTaskProcessor == null) {
+ logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
+ isolationTaskCode);
+ continue;
+ }
+ iTaskProcessor.action(TaskAction.ISOLATE);
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ stateEvent.setProcessInstanceId(this.processInstance.getId());
+ stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
+ stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
+ this.addStateEvent(stateEvent);
+ }
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+
+ public void cancelTaskIsolation(long taskCode) {
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ // todo:
+ // restart the killed/ paused task
+ Set<String> needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
+ for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) {
+ Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr);
+ Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode);
+ if (isolateTimes == null) {
+ logger.warn(
+ "The current task has not been isolated, so it don't need to offline isolation, taskCode: {}",
+ isolationTaskCode);
+ continue;
+ }
+ if (isolateTimes == 1) {
+ isolationTaskCodesToTimesMap.remove(isolationTaskCode);
+ ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
+ if (iTaskProcessor == null) {
+ // the current task has not been submitted
+ continue;
+ }
+ // the current task has no pre isolation, restart
+ TaskInstance taskInstance = iTaskProcessor.taskInstance();
+ if (taskInstance.getState() == ExecutionStatus.PAUSE_BY_ISOLATION) {
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+ addTaskToStandByList(taskInstance);
+ logger.info(
+ "Cancel isolation task, the current task state is pause_by_isolation, change to submitted_success and add it back to standbyList");
+ continue;
+ }
+ if (taskInstance.getState() == ExecutionStatus.KILL_BY_ISOLATION) {
+ addTaskToStandByList(cloneCancelIsolationTaskInstance(taskInstance));
+ logger.info(
+ "Cancel isolation task, the current task state is kill_by_isolation, change to submitted_success and add it back to standbyList");
+ continue;
+ }
+ } else {
+ isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
+ }
+
+ }
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+
+ }
+
/**
* crate new task instance to retry, different objects from the original
*
@@ -798,6 +886,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
+ List<IsolationTask> isolationTasks =
+ isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE);
+ for (IsolationTask isolationTask : isolationTasks) {
+ Set<String> allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag);
+ allPostNodes.forEach(postNode -> {
+ isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(),
+ isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1);
+ });
+ }
logger.info("Build dag success, dag: {}", dag);
}
@@ -933,6 +1030,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
+ if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
+ taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION);
+ logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}",
+ taskInstance.getTaskCode());
+ }
+
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@@ -1117,6 +1220,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return newTaskInstance;
}
+ private @Nullable TaskInstance cloneCancelIsolationTaskInstance(TaskInstance taskInstance) {
+ TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
+ if (taskNode == null) {
+ logger.error("clone retry task instance error, taskNode is null, code:{}", taskInstance.getTaskCode());
+ return null;
+ }
+ TaskInstance newTaskInstance = TaskInstanceUtils.cloneTaskInstance(processInstance, taskNode, taskInstance);
+ TaskInstanceUtils.injectEnvironment(taskInstance, getTaskInstanceEnvironment(newTaskInstance));
+ return newTaskInstance;
+ }
+
private @Nullable Environment getTaskInstanceEnvironment(@NonNull TaskInstance taskInstance) {
Environment environment = null;
if (!taskInstance.getEnvironmentCode().equals(-1L)) {
@@ -1468,17 +1582,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
- List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
// tasks currently pending submission, no retries, indicating that depend is waiting to complete
return ExecutionStatus.RUNNING_EXECUTION;
- } else if (CollectionUtils.isNotEmpty(killTasks)) {
+ }
+ if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL))) {
// tasks maybe killed manually
return ExecutionStatus.FAILURE;
- } else {
- // if the waiting queue is empty and the status is in progress, then success
- return ExecutionStatus.SUCCESS;
}
+ if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL_BY_ISOLATION))
+ || CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.PAUSE_BY_ISOLATION))) {
+ // No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION
+ return ExecutionStatus.PAUSE_BY_ISOLATION;
+ }
+ // if the waiting queue is empty and the status is in progress, then success
+ return ExecutionStatus.SUCCESS;
}
return state;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index ad3d42a3a5..e86a93ffc1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
@@ -111,6 +112,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected boolean timeout = false;
+ protected boolean isolated = false;
+
protected TaskInstance taskInstance = null;
protected ProcessInstance processInstance;
@@ -160,6 +163,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean killTask();
+ protected abstract boolean isolateTask();
+
protected abstract boolean taskTimeout();
protected abstract boolean runTask();
@@ -193,6 +198,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
case DISPATCH:
result = dispatch();
break;
+ case ISOLATE:
+ result = isolate();
+ break;
default:
logger.error("unknown task action: {}", taskAction);
}
@@ -256,6 +264,14 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return paused;
}
+ private boolean isolate() {
+ if (isolated) {
+ return true;
+ }
+ isolated = isolateTask();
+ return isolated;
+ }
+
protected boolean stop() {
if (killed) {
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 5fcad28858..3285623186 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -21,7 +21,6 @@ import com.google.auto.service.AutoService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.BlockingOpportunity;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -30,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
@@ -89,6 +87,15 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean isolateTask() {
+ taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+ taskInstance.setEndTime(new Date());
+ processService.saveTaskInstance(taskInstance);
+ logger.info("Blocking task has been isolated");
+ return true;
+ }
+
@Override
protected boolean taskTimeout() {
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 7b45be6373..e1ef11eef3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.task;
+import com.google.auto.service.AutoService;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -34,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import java.util.Date;
-import com.google.auto.service.AutoService;
-
/**
* common task processor
*/
@@ -130,6 +129,27 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean isolateTask() {
+ try {
+ if (taskInstance.getState().typeIsFinished()) {
+ return true;
+ }
+ taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+ taskInstance.setEndTime(new Date());
+ processService.updateTaskInstance(taskInstance);
+ if (StringUtils.isNotEmpty(taskInstance.getHost())) {
+ killRemoteTask();
+ }
+ logger.info("Master isolate taskInstance success, taskName: {} taskInstanceId: {}", taskInstance.getName(),
+ taskInstance.getId());
+ return true;
+ } catch (Exception e) {
+ logger.error("Master isolate task error, taskInstance id: {}", taskInstance.getId(), e);
+ return false;
+ }
+ }
+
@Override
protected boolean taskTimeout() {
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 7d3669c443..5c76496aa3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -27,7 +26,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
@@ -106,6 +104,15 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean isolateTask() {
+ taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+ taskInstance.setEndTime(new Date());
+ processService.saveTaskInstance(taskInstance);
+ logger.info("Condition task has been isolated");
+ return true;
+ }
+
@Override
public String getType() {
return TASK_TYPE_CONDITIONS;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 6dc5cc6b7f..d5fe056820 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -202,6 +202,14 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean isolateTask() {
+ taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+ taskInstance.setEndTime(new Date());
+ processService.saveTaskInstance(taskInstance);
+ return true;
+ }
+
/**
* judge all dependent tasks finish
*
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 754eba7d50..a681366ea5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -201,6 +201,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean isolateTask() {
+ ProcessInstance subProcessInstance =
+ processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
+ if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
+ return true;
+ }
+ // todo: isolate sub process
+ return true;
+ }
+
private void sendToSubProcess() {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index e3ba4c4db0..8ed3c42ce8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -85,6 +85,14 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean isolateTask() {
+ taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
+ taskInstance.setEndTime(new Date());
+ processService.saveTaskInstance(taskInstance);
+ return true;
+ }
+
@Override
protected boolean taskTimeout() {
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 9044945258..2c60d364bc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -22,6 +22,7 @@ package org.apache.dolphinscheduler.server.master.runner.task;
*/
public enum TaskAction {
PAUSE,
+ ISOLATE,
STOP,
TIMEOUT,
SUBMIT,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
new file mode 100644
index 0000000000..2621766d84
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java
@@ -0,0 +1,97 @@
+package org.apache.dolphinscheduler.server.master.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
+import org.apache.dolphinscheduler.dao.entity.IsolationTask;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+public class IsolationTaskManager {
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ private volatile Map<Long, IsolationTask> onlineIsolationTasksInMemory = new HashMap<>();
+
+ private volatile Set<Long> currentIsolationTaskIdsInMemory = new HashSet<>();
+
+ @Autowired
+ private IsolationTaskDao isolationTaskDao;
+
+ @PostConstruct
+ public void init() {
+ refreshIsolationTaskMapFromDB();
+ }
+
+ public void refreshIsolationTaskMapFromDB() {
+ Map<Long, IsolationTask> totalOnlineIsolationTasksInDB =
+ isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE)
+ .stream()
+ .collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
+ Set<Long> totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet();
+
+ Collection<IsolationTask> needToOfflineIsolationTasks =
+ CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB)
+ .stream()
+ .map(onlineIsolationTasksInMemory::get)
+ .collect(Collectors.toList());
+
+ Collection<IsolationTask> needToOnlineIsolationTasks =
+ CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory)
+ .stream()
+ .map(totalOnlineIsolationTasksInDB::get)
+ .collect(Collectors.toList());
+
+ currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB;
+ onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB;
+
+ offlineIsolationTask(needToOfflineIsolationTasks);
+ onlineIsolationTask(needToOnlineIsolationTasks);
+ }
+
+ private void offlineIsolationTask(Collection<IsolationTask> needOfflineIsolationTasks) {
+ if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) {
+ return;
+ }
+ for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) {
+ WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+ .getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId());
+ if (workflowExecuteRunnable == null) {
+ continue;
+ }
+ workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode());
+ log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId());
+ }
+ }
+
+ private void onlineIsolationTask(Collection<IsolationTask> needOnlineIsolationTasks) {
+ if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) {
+ return;
+ }
+ for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) {
+ WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
+ .getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId());
+ if (workflowExecuteRunnable == null) {
+ continue;
+ }
+ workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode());
+ log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId());
+ }
+ }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
index 5d86abc8e4..e4bdf3511c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java
@@ -166,7 +166,6 @@ public class TaskInstanceUtils {
taskInstance.setTaskDefine(oldTaskInstance.getTaskDefine());
taskInstance.setProcessDefine(oldTaskInstance.getProcessDefine());
taskInstance.setProcessInstance(processInstance);
- taskInstance.setState(oldTaskInstance.getState());
return taskInstance;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
index e1088bf83d..00ae91019d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
@@ -36,6 +36,8 @@ public class WorkflowInstanceUtils {
return getStartTaskInstanceIdsFromRecoverParam(processInstance);
case START_FROM_STATE_CLEAN_TASKS:
return getStartTaskInstanceIdsFromStateCleanParam(processInstance);
+ case RECOVERY_FROM_ISOLATION_TASKS:
+ return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance);
default:
return Collections.emptyList();
}
@@ -69,6 +71,22 @@ public class WorkflowInstanceUtils {
return stateCleanTaskInstanceIds;
}
+ public static List<Integer> getStartTaskInstanceIdsFromRecoverIsolationParam(@NonNull ProcessInstance processInstance) {
+ Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+ List<Integer> recoveryPausedIsolationIds =
+ JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+ new TypeReference<ArrayList<Integer>>() {
+ });
+ List<Integer> recoveryKilledIsolationIds =
+ JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+ new TypeReference<ArrayList<Integer>>() {
+ });
+ List<Integer> result = new ArrayList<>();
+ result.addAll(recoveryPausedIsolationIds);
+ result.addAll(recoveryKilledIsolationIds);
+ return result;
+ }
+
public static List<String> getStartNodeName(@NonNull ProcessInstance processInstance) {
List<String> startNodeNameList = new ArrayList<>();
Map<String, String> paramMap = JSONUtils.toMap(processInstance.getCommandParam());
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 6f55e1d5b5..856d38cb2f 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -92,6 +93,8 @@ public class WorkflowExecuteRunnableTest {
private CuringParamsService curingGlobalParamsService;
+ private IsolationTaskDao isolationTaskDao;
+
@Before
public void init() throws Exception {
applicationContext = mock(ApplicationContext.class);
@@ -124,10 +127,11 @@ public class WorkflowExecuteRunnableTest {
curingGlobalParamsService = mock(CuringParamsService.class);
NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
+ isolationTaskDao = mock(IsolationTaskDao.class);
workflowExecuteThread =
PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao,
nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread,
- curingGlobalParamsService));
+ curingGlobalParamsService, isolationTaskDao));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 37f0b97f22..daf92071b5 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -119,5 +119,9 @@ public enum CommandType {
/**
* workflow executing data response, from master to api
*/
- WORKFLOW_EXECUTING_DATA_RESPONSE;
+ WORKFLOW_EXECUTING_DATA_RESPONSE,
+
+ REFRESH_ISOLATION_REQUEST,
+ ;
+
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
new file mode 100644
index 0000000000..9b097843c4
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.remote.command.isolation;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.io.Serializable;
+
+public class RefreshIsolationTaskRequest implements Serializable {
+
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 009b872681..c1fe78e78a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1047,6 +1047,27 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setRunTimes(runTime + 1);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
break;
+ case RECOVERY_FROM_ISOLATION_TASKS:
+ Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+ List<Integer> recoveryPausedIsolationIds =
+ JSONUtils.parseObject(
+ commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
+ new TypeReference<ArrayList<Integer>>() {
+ });
+ if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) {
+ recoveryPausedIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
+ }
+ List<Integer> recoveryKilledIsolationIds =
+ JSONUtils.parseObject(
+ commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
+ new TypeReference<ArrayList<Integer>>() {
+ });
+ if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) {
+ recoveryKilledIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
+ }
+ processInstance.setRunTimes(runTime + 1);
+ processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ break;
default:
break;
}
@@ -1564,9 +1585,6 @@ public class ProcessServiceImpl implements ProcessService {
taskInstance.getTaskCode());
return null;
}
- if (processInstanceState == ExecutionStatus.READY_PAUSE) {
- taskInstance.setState(ExecutionStatus.PAUSE);
- }
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
if (taskInstance.getSubmitTime() == null) {
@@ -1603,7 +1621,8 @@ public class ProcessServiceImpl implements ProcessService {
if (state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
- || state == ExecutionStatus.DISPATCH) {
+ || state == ExecutionStatus.DISPATCH
+ || state == ExecutionStatus.PAUSE_BY_ISOLATION) {
return state;
}
// return pasue /stop if process instance state is ready pause / stop
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index da728b2fdb..a83184e16b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.api.enums;
-import java.util.HashMap;
-
import com.baomidou.mybatisplus.annotation.EnumValue;
+import java.util.HashMap;
+
/**
* running status for workflow and task nodes
*/
@@ -65,6 +65,8 @@ public enum ExecutionStatus {
READY_BLOCK(15, "ready block"),
BLOCK(16, "block"),
DISPATCH(17, "dispatch"),
+ PAUSE_BY_ISOLATION(18, "paused by isolation"),
+ KILL_BY_ISOLATION(19, "killed by isolation"),
;
ExecutionStatus(int code, String descp) {
@@ -117,10 +119,14 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFinished() {
- return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
+ return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation()
|| typeIsStop() || typeIsBlock();
}
+ public boolean typeIsReady() {
+ return this == READY_PAUSE || this == READY_STOP || this == READY_BLOCK;
+ }
+
/**
* status is waiting thread
*
@@ -139,6 +145,18 @@ public enum ExecutionStatus {
return this == PAUSE;
}
+ public boolean typeIsPauseByIsolation() {
+ return this == PAUSE_BY_ISOLATION;
+ }
+
+ public boolean typeIsKilledByIsolation() {
+ return this == KILL_BY_ISOLATION;
+ }
+
+ public boolean typeIsIsolated() {
+ return this == PAUSE_BY_ISOLATION || this == KILL_BY_ISOLATION;
+ }
+
/**
* status is pause
*
@@ -172,7 +190,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsCancel() {
- return this == KILL || this == STOP;
+ return this == KILL || this == STOP || this == KILL_BY_ISOLATION;
}
public int getCode() {