You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/04/07 04:21:41 UTC
[dolphinscheduler] branch dev updated: [optimization] [Service] Optimization ProcessService and add ProcessService interface (#9370)
This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5ef3f9d668 [optimization] [Service] Optimization ProcessService and add ProcessService interface (#9370)
5ef3f9d668 is described below
commit 5ef3f9d6681d8d150bfee5b77b7929f306fbdc8a
Author: gaojun2048 <ga...@gmail.com>
AuthorDate: Thu Apr 7 12:21:34 2022 +0800
[optimization] [Service] Optimization ProcessService and add ProcessService interface (#9370)
---
.../service/process/ProcessService.java | 2985 ++------------------
...ProcessService.java => ProcessServiceImpl.java} | 240 +-
.../service/process/ProcessServiceTest.java | 33 +-
3 files changed, 390 insertions(+), 2868 deletions(-)
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2255c0811b..395aa31668 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,38 +17,11 @@
package org.apache.dolphinscheduler.service.process;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-
-import static java.util.stream.Collectors.toSet;
-
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.FailureStrategy;
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.ReleaseState;
-import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
-import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.common.process.ProcessDag;
-import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
@@ -58,9 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
import org.apache.dolphinscheduler.dao.entity.DqRule;
import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
-import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue;
import org.apache.dolphinscheduler.dao.entity.Environment;
-import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -73,2810 +44,256 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
-import org.apache.dolphinscheduler.dao.mapper.DqComparisonTypeMapper;
-import org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper;
-import org.apache.dolphinscheduler.dao.mapper.DqRuleExecuteSqlMapper;
-import org.apache.dolphinscheduler.dao.mapper.DqRuleInputEntryMapper;
-import org.apache.dolphinscheduler.dao.mapper.DqRuleMapper;
-import org.apache.dolphinscheduler.dao.mapper.DqTaskStatisticsValueMapper;
-import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
-import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
-import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
-import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
-import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
-import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
-import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
-import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
-import org.apache.dolphinscheduler.service.log.LogClientService;
-import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
+import org.slf4j.Logger;
+import org.springframework.transaction.annotation.Transactional;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
+public interface ProcessService {
+ @Transactional
+ ProcessInstance handleCommand(Logger logger, String host, Command command);
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
+ void moveToErrorCommand(Command command, String message);
-/**
- * process relative dao that some mappers in this.
- */
-@Component
-public class ProcessService {
+ int createCommand(Command command);
- private final Logger logger = LoggerFactory.getLogger(getClass());
+ List<Command> findCommandPage(int pageSize, int pageNumber);
- private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.DISPATCH.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal()};
+ List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
- @Autowired
- private UserMapper userMapper;
+ boolean verifyIsNeedCreateCommand(Command command);
- @Autowired
- private ProcessDefinitionMapper processDefineMapper;
+ ProcessInstance findProcessInstanceDetailById(int processId);
- @Autowired
- private ProcessDefinitionLogMapper processDefineLogMapper;
+ List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
- @Autowired
- private ProcessInstanceMapper processInstanceMapper;
+ ProcessInstance findProcessInstanceById(int processId);
- @Autowired
- private DataSourceMapper dataSourceMapper;
+ ProcessDefinition findProcessDefineById(int processDefinitionId);
- @Autowired
- private ProcessInstanceMapMapper processInstanceMapMapper;
+ ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version);
- @Autowired
- private TaskInstanceMapper taskInstanceMapper;
+ ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode);
- @Autowired
- private CommandMapper commandMapper;
+ int deleteWorkProcessInstanceById(int processInstanceId);
- @Autowired
- private ScheduleMapper scheduleMapper;
+ int deleteAllSubWorkProcessByParentId(int processInstanceId);
- @Autowired
- private UdfFuncMapper udfFuncMapper;
+ void removeTaskLogFile(Integer processInstanceId);
- @Autowired
- private ResourceMapper resourceMapper;
+ void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId);
- @Autowired
- private ResourceUserMapper resourceUserMapper;
+ void recurseFindSubProcess(long parentCode, List<Long> ids);
- @Autowired
- private ErrorCommandMapper errorCommandMapper;
+ void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance);
- @Autowired
- private TenantMapper tenantMapper;
+ Tenant getTenantForProcess(int tenantId, int userId);
- @Autowired
- private ProjectMapper projectMapper;
+ Environment findEnvironmentByCode(Long environmentCode);
- @Autowired
- private DqExecuteResultMapper dqExecuteResultMapper;
+ void setSubProcessParam(ProcessInstance subProcessInstance);
- @Autowired
- private DqRuleMapper dqRuleMapper;
+ TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval);
- @Autowired
- private DqRuleInputEntryMapper dqRuleInputEntryMapper;
+ @Transactional(rollbackFor = Exception.class)
+ TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance);
- @Autowired
- private DqRuleExecuteSqlMapper dqRuleExecuteSqlMapper;
+ void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task);
- @Autowired
- private DqComparisonTypeMapper dqComparisonTypeMapper;
+ Map<String, String> getGlobalParamMap(String globalParams);
- @Autowired
- private DqTaskStatisticsValueMapper dqTaskStatisticsValueMapper;
+ Command createSubProcessCommand(ProcessInstance parentProcessInstance,
+ ProcessInstance childInstance,
+ ProcessInstanceMap instanceMap,
+ TaskInstance task);
- @Autowired
- private TaskDefinitionMapper taskDefinitionMapper;
+ TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
- @Autowired
- private TaskDefinitionLogMapper taskDefinitionLogMapper;
+ ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
- @Autowired
- private ProcessTaskRelationMapper processTaskRelationMapper;
+ void saveProcessInstance(ProcessInstance processInstance);
- @Autowired
- private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+ int saveCommand(Command command);
+ boolean saveTaskInstance(TaskInstance taskInstance);
- @Autowired
- StateEventCallbackService stateEventCallbackService;
+ boolean createTaskInstance(TaskInstance taskInstance);
- @Autowired
- private EnvironmentMapper environmentMapper;
+ boolean updateTaskInstance(TaskInstance taskInstance);
- @Autowired
- private TaskGroupQueueMapper taskGroupQueueMapper;
+ TaskInstance findTaskInstanceById(Integer taskId);
- @Autowired
- private TaskGroupMapper taskGroupMapper;
+ List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
- @Autowired
- private WorkFlowLineageMapper workFlowLineageMapper;
+ void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance);
- @Autowired
- private TaskPluginManager taskPluginManager;
+ void updateTaskDefinitionResources(TaskDefinition taskDefinition);
+
+ List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state);
+
+ List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId);
+
+ List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
+
+ int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
+
+ int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
+
+ ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
+
+ int deleteWorkProcessMapByParentId(int parentWorkProcessId);
+
+ ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId);
+
+ ProcessInstance findParentProcessInstance(Integer subProcessId);
+
+ int updateProcessInstance(ProcessInstance processInstance);
+
+ void changeOutParam(TaskInstance taskInstance);
+
+ List<String> convertIntListToString(List<Integer> intList);
+
+ Schedule querySchedule(int id);
+
+ List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
+
+ Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);
+
+ List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode);
+
+ List<ProcessInstance> queryNeedFailoverProcessInstances(String host);
+
+ List<String> queryNeedFailoverProcessInstanceHost();
- /**
- * handle Command (construct ProcessInstance from Command) , wrapped in transaction
- *
- * @param logger logger
- * @param host host
- * @param command found command
- * @return process instance
- */
- @Transactional
- public ProcessInstance handleCommand(Logger logger, String host, Command command) {
- ProcessInstance processInstance = constructProcessInstance(command, host);
- // cannot construct process instance, return null
- if (processInstance == null) {
- logger.error("scan command, command parameter is error: {}", command);
- moveToErrorCommand(command, "process instance is null");
- return null;
- }
- processInstance.setCommandType(command.getCommandType());
- processInstance.addHistoryCmd(command.getCommandType());
- //if the processDefinition is serial
- ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
- if (processDefinition.getExecutionType().typeIsSerial()) {
- saveSerialProcess(processInstance, processDefinition);
- if (processInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
- setSubProcessParam(processInstance);
- deleteCommandWithCheck(command.getId());
- return null;
- }
- } else {
- saveProcessInstance(processInstance);
- }
- setSubProcessParam(processInstance);
- deleteCommandWithCheck(command.getId());
- return processInstance;
- }
-
- protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) {
- processInstance.setState(ExecutionStatus.SERIAL_WAIT);
- saveProcessInstance(processInstance);
- //serial wait
- //when we get the running instance(or waiting instance) only get the priority instance(by id)
- if (processDefinition.getExecutionType().typeIsSerialWait()) {
- while (true) {
- List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
- if (CollectionUtils.isEmpty(runningProcessInstances)) {
- processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- saveProcessInstance(processInstance);
- return;
- }
- ProcessInstance runningProcess = runningProcessInstances.get(0);
- if (this.processInstanceMapper.updateNextProcessIdById(processInstance.getId(), runningProcess.getId())) {
- return;
- }
- }
- } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
- List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
- if (CollectionUtils.isEmpty(runningProcessInstances)) {
- processInstance.setState(ExecutionStatus.STOP);
- saveProcessInstance(processInstance);
- }
- } else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
- List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
- if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
- for (ProcessInstance info : runningProcessInstances) {
- info.setCommandType(CommandType.STOP);
- info.addHistoryCmd(CommandType.STOP);
- info.setState(ExecutionStatus.READY_STOP);
- int update = updateProcessInstance(info);
- // determine whether the process is normal
- if (update > 0) {
- String host = info.getHost();
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
- StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
- info.getId(), 0, info.getState(), info.getId(), 0
- );
- try {
- stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
- } catch (Exception e) {
- logger.error("sendResultError");
- }
- }
- }
- }
- }
- }
-
- /**
- * save error command, and delete original command
- *
- * @param command command
- * @param message message
- */
- public void moveToErrorCommand(Command command, String message) {
- ErrorCommand errorCommand = new ErrorCommand(command, message);
- this.errorCommandMapper.insert(errorCommand);
- this.commandMapper.deleteById(command.getId());
- }
-
- /**
- * set process waiting thread
- *
- * @param command command
- * @param processInstance processInstance
- * @return process instance
- */
- private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) {
- processInstance.setState(ExecutionStatus.WAITING_THREAD);
- if (command.getCommandType() != CommandType.RECOVER_WAITING_THREAD) {
- processInstance.addHistoryCmd(command.getCommandType());
- }
- saveProcessInstance(processInstance);
- this.setSubProcessParam(processInstance);
- createRecoveryWaitingThreadCommand(command, processInstance);
- return null;
- }
-
- /**
- * insert one command
- *
- * @param command command
- * @return create result
- */
- public int createCommand(Command command) {
- int result = 0;
- if (command != null) {
- result = commandMapper.insert(command);
- }
- return result;
- }
-
- /**
- * get command page
- */
- public List<Command> findCommandPage(int pageSize, int pageNumber) {
- return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
- }
-
- /**
- * get command page
- */
- public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
- if (masterCount <= 0) {
- return Lists.newArrayList();
- }
- return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
- }
-
- /**
- * check the input command exists in queue list
- *
- * @param command command
- * @return create command result
- */
- public boolean verifyIsNeedCreateCommand(Command command) {
- boolean isNeedCreate = true;
- EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
- cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
- cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
- cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
- CommandType commandType = command.getCommandType();
-
- if (cmdTypeMap.containsKey(commandType)) {
- ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
- int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
-
- List<Command> commands = commandMapper.selectList(null);
- // for all commands
- for (Command tmpCommand : commands) {
- if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
- ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
- if (tempObj != null && processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
- isNeedCreate = false;
- break;
- }
- }
- }
- }
- return isNeedCreate;
- }
-
- /**
- * find process instance detail by id
- *
- * @param processId processId
- * @return process instance
- */
- public ProcessInstance findProcessInstanceDetailById(int processId) {
- return processInstanceMapper.queryDetailById(processId);
- }
-
- /**
- * get task node list by definitionId
- */
- public List<TaskDefinition> getTaskNodeListByDefinition(long defineCode) {
- ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode);
- if (processDefinition == null) {
- logger.error("process define not exists");
- return Lists.newArrayList();
- }
- List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
- Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
- for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
- if (processTaskRelation.getPostTaskCode() > 0) {
- taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
- }
- }
- if (taskDefinitionSet.isEmpty()) {
- return Lists.newArrayList();
- }
- List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
- return Lists.newArrayList(taskDefinitionLogs);
- }
-
- /**
- * find process instance by id
- *
- * @param processId processId
- * @return process instance
- */
- public ProcessInstance findProcessInstanceById(int processId) {
- return processInstanceMapper.selectById(processId);
- }
-
- /**
- * find process define by id.
- *
- * @param processDefinitionId processDefinitionId
- * @return process definition
- */
- public ProcessDefinition findProcessDefineById(int processDefinitionId) {
- return processDefineMapper.selectById(processDefinitionId);
- }
-
- /**
- * find process define by code and version.
- *
- * @param processDefinitionCode processDefinitionCode
- * @return process definition
- */
- public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) {
- ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
- if (processDefinition == null || processDefinition.getVersion() != version) {
- processDefinition = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
- if (processDefinition != null) {
- processDefinition.setId(0);
- }
- }
- return processDefinition;
- }
-
- /**
- * find process define by code.
- *
- * @param processDefinitionCode processDefinitionCode
- * @return process definition
- */
- public ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode) {
- return processDefineMapper.queryByCode(processDefinitionCode);
- }
-
- /**
- * delete work process instance by id
- *
- * @param processInstanceId processInstanceId
- * @return delete process instance result
- */
- public int deleteWorkProcessInstanceById(int processInstanceId) {
- return processInstanceMapper.deleteById(processInstanceId);
- }
-
- /**
- * delete all sub process by parent instance id
- *
- * @param processInstanceId processInstanceId
- * @return delete all sub process instance result
- */
- public int deleteAllSubWorkProcessByParentId(int processInstanceId) {
-
- List<Integer> subProcessIdList = processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
-
- for (Integer subId : subProcessIdList) {
- deleteAllSubWorkProcessByParentId(subId);
- deleteWorkProcessMapByParentId(subId);
- removeTaskLogFile(subId);
- deleteWorkProcessInstanceById(subId);
- }
- return 1;
- }
-
- /**
- * remove task log file
- *
- * @param processInstanceId processInstanceId
- */
- public void removeTaskLogFile(Integer processInstanceId) {
- List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
- if (CollectionUtils.isEmpty(taskInstanceList)) {
- return;
- }
- try (LogClientService logClient = new LogClientService()) {
- for (TaskInstance taskInstance : taskInstanceList) {
- String taskLogPath = taskInstance.getLogPath();
- if (StringUtils.isEmpty(taskInstance.getHost())) {
- continue;
- }
- Host host = Host.of(taskInstance.getHost());
- // remove task log from loggerserver
- logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
- }
- }
- }
-
- /**
- * recursive delete all task instance by process instance id
- */
- public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
- List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
- if (CollectionUtils.isEmpty(taskInstanceList)) {
- return;
- }
-
- List<Integer> taskInstanceIdList = new ArrayList<>();
-
- for (TaskInstance taskInstance : taskInstanceList) {
- taskInstanceIdList.add(taskInstance.getId());
- }
-
- taskInstanceMapper.deleteBatchIds(taskInstanceIdList);
- }
-
- /**
- * recursive query sub process definition id by parent id.
- *
- * @param parentCode parentCode
- * @param ids ids
- */
- public void recurseFindSubProcess(long parentCode, List<Long> ids) {
- List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
-
- if (taskNodeList != null && !taskNodeList.isEmpty()) {
-
- for (TaskDefinition taskNode : taskNodeList) {
- String parameter = taskNode.getTaskParams();
- ObjectNode parameterJson = JSONUtils.parseObject(parameter);
- if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_CODE) != null) {
- SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class);
- ids.add(subProcessParam.getProcessDefinitionCode());
- recurseFindSubProcess(subProcessParam.getProcessDefinitionCode(), ids);
- }
- }
- }
- }
-
- /**
- * create recovery waiting thread command when thread pool is not enough for the process instance.
- * sub work process instance need not to create recovery command.
- * create recovery waiting thread command and delete origin command at the same time.
- * if the recovery command is exists, only update the field update_time
- *
- * @param originCommand originCommand
- * @param processInstance processInstance
- */
- public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
-
- // sub process doesnot need to create wait command
- if (processInstance.getIsSubProcess() == Flag.YES) {
- if (originCommand != null) {
- commandMapper.deleteById(originCommand.getId());
- }
- return;
- }
- Map<String, String> cmdParam = new HashMap<>();
- cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId()));
- // process instance quit by "waiting thread" state
- if (originCommand == null) {
- Command command = new Command(
- CommandType.RECOVER_WAITING_THREAD,
- processInstance.getTaskDependType(),
- processInstance.getFailureStrategy(),
- processInstance.getExecutorId(),
- processInstance.getProcessDefinition().getCode(),
- JSONUtils.toJsonString(cmdParam),
- processInstance.getWarningType(),
- processInstance.getWarningGroupId(),
- processInstance.getScheduleTime(),
- processInstance.getWorkerGroup(),
- processInstance.getEnvironmentCode(),
- processInstance.getProcessInstancePriority(),
- processInstance.getDryRun(),
- processInstance.getId(),
- processInstance.getProcessDefinitionVersion()
- );
- saveCommand(command);
- return;
- }
-
- // update the command time if current command if recover from waiting
- if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
- originCommand.setUpdateTime(new Date());
- saveCommand(originCommand);
- } else {
- // delete old command and create new waiting thread command
- commandMapper.deleteById(originCommand.getId());
- originCommand.setId(0);
- originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
- originCommand.setUpdateTime(new Date());
- originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
- originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- saveCommand(originCommand);
- }
- }
-
- /**
- * get schedule time from command
- *
- * @param command command
- * @param cmdParam cmdParam map
- * @return date
- */
- private Date getScheduleTime(Command command, Map<String, String> cmdParam) {
- Date scheduleTime = command.getScheduleTime();
- if (scheduleTime == null
- && cmdParam != null
- && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
-
- Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
- Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
- List<Schedule> schedules = queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
- List<Date> complementDateList = CronUtils.getSelfFireDateList(start, end, schedules);
-
- if (complementDateList.size() > 0) {
- scheduleTime = complementDateList.get(0);
- } else {
- logger.error("set scheduler time error: complement date list is empty, command: {}",
- command.toString());
- }
- }
- return scheduleTime;
- }
-
- /**
- * generate a new work process instance from command.
- *
- * @param processDefinition processDefinition
- * @param command command
- * @param cmdParam cmdParam map
- * @return process instance
- */
- private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
- Command command,
- Map<String, String> cmdParam) {
- ProcessInstance processInstance = new ProcessInstance(processDefinition);
- processInstance.setProcessDefinitionCode(processDefinition.getCode());
- processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
- processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
- processInstance.setRecovery(Flag.NO);
- processInstance.setStartTime(new Date());
- processInstance.setRestartTime(processInstance.getStartTime());
- processInstance.setRunTimes(1);
- processInstance.setMaxTryTimes(0);
- processInstance.setCommandParam(command.getCommandParam());
- processInstance.setCommandType(command.getCommandType());
- processInstance.setIsSubProcess(Flag.NO);
- processInstance.setTaskDependType(command.getTaskDependType());
- processInstance.setFailureStrategy(command.getFailureStrategy());
- processInstance.setExecutorId(command.getExecutorId());
- WarningType warningType = command.getWarningType() == null ? WarningType.NONE : command.getWarningType();
- processInstance.setWarningType(warningType);
- Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
- processInstance.setWarningGroupId(warningGroupId);
- processInstance.setDryRun(command.getDryRun());
-
- if (command.getScheduleTime() != null) {
- processInstance.setScheduleTime(command.getScheduleTime());
- }
- processInstance.setCommandStartTime(command.getStartTime());
- processInstance.setLocations(processDefinition.getLocations());
-
- // reset global params while there are start parameters
- setGlobalParamIfCommanded(processDefinition, cmdParam);
-
- // curing global params
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
-
- // set process instance priority
- processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
- String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup();
- processInstance.setWorkerGroup(workerGroup);
- processInstance.setEnvironmentCode(Objects.isNull(command.getEnvironmentCode()) ? -1 : command.getEnvironmentCode());
- processInstance.setTimeout(processDefinition.getTimeout());
- processInstance.setTenantId(processDefinition.getTenantId());
- return processInstance;
- }
-
- private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
- // get start params from command param
- Map<String, String> startParamMap = new HashMap<>();
- if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
- String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
- startParamMap = JSONUtils.toMap(startParamJson);
- }
- Map<String, String> fatherParamMap = new HashMap<>();
- if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
- String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
- fatherParamMap = JSONUtils.toMap(fatherParamJson);
- }
- startParamMap.putAll(fatherParamMap);
- // set start param into global params
- if (startParamMap.size() > 0
- && processDefinition.getGlobalParamMap() != null) {
- for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
- String val = startParamMap.get(param.getKey());
- if (val != null) {
- param.setValue(val);
- }
- }
- }
- }
-
- /**
- * get process tenant
- * there is tenant id in definition, use the tenant of the definition.
- * if there is not tenant id in the definiton or the tenant not exist
- * use definition creator's tenant.
- *
- * @param tenantId tenantId
- * @param userId userId
- * @return tenant
- */
- public Tenant getTenantForProcess(int tenantId, int userId) {
- Tenant tenant = null;
- if (tenantId >= 0) {
- tenant = tenantMapper.queryById(tenantId);
- }
-
- if (userId == 0) {
- return null;
- }
-
- if (tenant == null) {
- User user = userMapper.selectById(userId);
- tenant = tenantMapper.queryById(user.getTenantId());
- }
- return tenant;
- }
-
- /**
- * get an environment
- * use the code of the environment to find a environment.
- *
- * @param environmentCode environmentCode
- * @return Environment
- */
- public Environment findEnvironmentByCode(Long environmentCode) {
- Environment environment = null;
- if (environmentCode >= 0) {
- environment = environmentMapper.queryByEnvironmentCode(environmentCode);
- }
- return environment;
- }
-
- /**
- * check command parameters is valid
- *
- * @param command command
- * @param cmdParam cmdParam map
- * @return whether command param is valid
- */
- private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
- if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) {
- if (cmdParam == null
- || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
- || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
- logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
- return false;
- }
- }
- return true;
- }
-
- /**
- * construct process instance according to one command.
- *
- * @param command command
- * @param host host
- * @return process instance
- */
- protected ProcessInstance constructProcessInstance(Command command, String host) {
- ProcessInstance processInstance;
- ProcessDefinition processDefinition;
- CommandType commandType = command.getCommandType();
-
- processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
- if (processDefinition == null) {
- logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
- return null;
- }
- Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
- int processInstanceId = command.getProcessInstanceId();
- if (processInstanceId == 0) {
- processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
- } else {
- processInstance = this.findProcessInstanceDetailById(processInstanceId);
- if (processInstance == null) {
- return processInstance;
- }
- }
- if (cmdParam != null) {
- CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
- // reset global params while repeat running is needed by cmdParam
- if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
- setGlobalParamIfCommanded(processDefinition, cmdParam);
- }
-
- // Recalculate global parameters after rerun.
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- commandTypeIfComplement,
- processInstance.getScheduleTime()));
- processInstance.setProcessDefinition(processDefinition);
- }
- //reset command parameter
- if (processInstance.getCommandParam() != null) {
- Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
- processCmdParam.forEach((key, value) -> {
- if (!cmdParam.containsKey(key)) {
- cmdParam.put(key, value);
- }
- });
- }
- // reset command parameter if sub process
- if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
- processInstance.setCommandParam(command.getCommandParam());
- }
- if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
- logger.error("command parameter check failed!");
- return null;
- }
- if (command.getScheduleTime() != null) {
- processInstance.setScheduleTime(command.getScheduleTime());
- }
- processInstance.setHost(host);
- processInstance.setRestartTime(new Date());
- ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
- int runTime = processInstance.getRunTimes();
- switch (commandType) {
- case START_PROCESS:
- break;
- case START_FAILURE_TASK_PROCESS:
- // find failed tasks and init these tasks
- List<Integer> failedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
- List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE);
- List<Integer> killedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
- cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
-
- failedList.addAll(killedList);
- failedList.addAll(toleranceList);
- for (Integer taskId : failedList) {
- initTaskInstance(this.findTaskInstanceById(taskId));
- }
- cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
- String.join(Constants.COMMA, convertIntListToString(failedList)));
- processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processInstance.setRunTimes(runTime + 1);
- break;
- case START_CURRENT_TASK_PROCESS:
- break;
- case RECOVER_WAITING_THREAD:
- break;
- case RECOVER_SUSPENDED_PROCESS:
- // find pause tasks and init task's state
- cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
- List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
- List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
- ExecutionStatus.KILL);
- suspendedNodeList.addAll(stopNodeList);
- for (Integer taskId : suspendedNodeList) {
- // initialize the pause state
- initTaskInstance(this.findTaskInstanceById(taskId));
- }
- cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(suspendedNodeList)));
- processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processInstance.setRunTimes(runTime + 1);
- break;
- case RECOVER_TOLERANCE_FAULT_PROCESS:
- // recover tolerance fault process
- processInstance.setRecovery(Flag.YES);
- runStatus = processInstance.getState();
- break;
- case COMPLEMENT_DATA:
- // delete all the valid tasks when complement data if id is not null
- if (processInstance.getId() != 0) {
- List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance taskInstance : taskInstanceList) {
- taskInstance.setFlag(Flag.NO);
- this.updateTaskInstance(taskInstance);
- }
- }
- break;
- case REPEAT_RUNNING:
- // delete the recover task names from command parameter
- if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
- cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
- processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
- }
- // delete all the valid tasks when repeat running
- List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance taskInstance : validTaskList) {
- taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
- }
- processInstance.setStartTime(new Date());
- processInstance.setRestartTime(processInstance.getStartTime());
- processInstance.setEndTime(null);
- processInstance.setRunTimes(runTime + 1);
- initComplementDataParam(processDefinition, processInstance, cmdParam);
- break;
- case SCHEDULER:
- break;
- default:
- break;
- }
- processInstance.setState(runStatus);
- return processInstance;
- }
-
- /**
- * get process definition by command
- * If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance
- * Otherwise, get the latest version of ProcessDefinition
- *
- * @return ProcessDefinition
- */
- private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
- if (cmdParam != null) {
- int processInstanceId = 0;
- if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
- processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING));
- } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
- processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS));
- } else if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
- processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD));
- }
-
- if (processInstanceId != 0) {
- ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId);
- if (processInstance == null) {
- return null;
- }
-
- return processDefineLogMapper.queryByDefinitionCodeAndVersion(
- processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
- }
- }
-
- return processDefineMapper.queryByCode(processDefinitionCode);
- }
-
- /**
- * return complement data if the process start with complement data
- *
- * @param processInstance processInstance
- * @param command command
- * @return command type
- */
- private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
- if (CommandType.COMPLEMENT_DATA == processInstance.getCmdTypeIfComplement()) {
- return CommandType.COMPLEMENT_DATA;
- } else {
- return command.getCommandType();
- }
- }
-
- /**
- * initialize complement data parameters
- *
- * @param processDefinition processDefinition
- * @param processInstance processInstance
- * @param cmdParam cmdParam
- */
- private void initComplementDataParam(ProcessDefinition processDefinition,
- ProcessInstance processInstance,
- Map<String, String> cmdParam) {
- if (!processInstance.isComplementData()) {
- return;
- }
-
- Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
- Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
- List<Schedule> listSchedules = queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
- List<Date> complementDate = CronUtils.getSelfFireDateList(start, end, listSchedules);
-
- if (complementDate.size() > 0
- && Flag.NO == processInstance.getIsSubProcess()) {
- processInstance.setScheduleTime(complementDate.get(0));
- }
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
- }
-
- /**
- * set sub work process parameters.
- * handle sub work process instance, update relation table and command parameters
- * set sub work process flag, extends parent work process command parameters
- *
- * @param subProcessInstance subProcessInstance
- */
- public void setSubProcessParam(ProcessInstance subProcessInstance) {
- String cmdParam = subProcessInstance.getCommandParam();
- if (StringUtils.isEmpty(cmdParam)) {
- return;
- }
- Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
- // write sub process id into cmd param.
- if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
- && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
- paramMap.remove(CMD_PARAM_SUB_PROCESS);
- paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
- subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
- subProcessInstance.setIsSubProcess(Flag.YES);
- this.saveProcessInstance(subProcessInstance);
- }
- // copy parent instance user def params to sub process..
- String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
- if (StringUtils.isNotEmpty(parentInstanceId)) {
- ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
- if (parentInstance != null) {
- subProcessInstance.setGlobalParams(
- joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
- this.saveProcessInstance(subProcessInstance);
- } else {
- logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
- }
- }
- ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
- if (processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0) {
- return;
- }
- // update sub process id to process map table
- processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
-
- this.updateWorkProcessInstanceMap(processInstanceMap);
- }
-
- /**
- * join parent global params into sub process.
- * only the keys doesn't in sub process global would be joined.
- *
- * @param parentGlobalParams parentGlobalParams
- * @param subGlobalParams subGlobalParams
- * @return global params join
- */
- private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
-
- // Since JSONUtils.toList return unmodified list, we need to creat a new List here.
- List<Property> parentParams = Lists.newArrayList(JSONUtils.toList(parentGlobalParams, Property.class));
- List<Property> subParams = JSONUtils.toList(subGlobalParams, Property.class);
-
- Set<String> parentParamKeys = parentParams.stream().map(Property::getProp).collect(toSet());
-
- // We will combine the params of parent workflow and sub workflow
- // If the params are defined in both, we will use parent's params to override the sub workflow(ISSUE-7962)
- // todo: Do we need to consider the other attribute of Property?
- // e.g. the subProp's type is not equals with parent, or subProp's direct is not equals with parent
- // It's suggested to add node name in property, this kind of problem can be solved.
- List<Property> extraSubParams = subParams.stream()
- .filter(subProp -> !parentParamKeys.contains(subProp.getProp())).collect(Collectors.toList());
- parentParams.addAll(extraSubParams);
- return JSONUtils.toJsonString(parentParams);
- }
-
- /**
- * initialize task instance
- *
- * @param taskInstance taskInstance
- */
- private void initTaskInstance(TaskInstance taskInstance) {
-
- if (!taskInstance.isSubProcess()
- && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
- taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
- return;
- }
- taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- updateTaskInstance(taskInstance);
- }
-
- /**
- * retry submit task to db
- */
- public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
- int retryTimes = 1;
- TaskInstance task = null;
- while (retryTimes <= commitRetryTimes) {
- try {
- // submit task to db
- task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance);
- if (task != null && task.getId() != 0) {
- break;
- }
- logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
- Thread.sleep(commitInterval);
- } catch (Exception e) {
- logger.error("task commit to db failed", e);
- }
- retryTimes += 1;
- }
- return task;
- }
-
- /**
- * submit task to db
- * submit sub process to command
- *
- * @param processInstance processInstance
- * @param taskInstance taskInstance
- * @return task instance
- */
- @Transactional(rollbackFor = Exception.class)
- public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
- logger.info("start submit task : {}, instance id:{}, state: {}",
- taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
- //submit to db
- TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
- if (task == null) {
- logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
- taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
- return null;
- }
-
- if (!task.getState().typeIsFinished()) {
- createSubWorkProcess(processInstance, task);
- }
-
- logger.info("end submit task to db successfully:{} {} state:{} complete, instance id:{} state: {} ",
- taskInstance.getId(), taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
- return task;
- }
-
- /**
- * set work process instance map
- * consider o
- * repeat running does not generate new sub process instance
- * set map {parent instance id, task instance id, 0(child instance id)}
- *
- * @param parentInstance parentInstance
- * @param parentTask parentTask
- * @return process instance map
- */
- private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
- ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
- if (processMap != null) {
- return processMap;
- }
- if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) {
- // update current task id to map
- processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
- if (processMap != null) {
- processMap.setParentTaskInstanceId(parentTask.getId());
- updateWorkProcessInstanceMap(processMap);
- return processMap;
- }
- }
- // new task
- processMap = new ProcessInstanceMap();
- processMap.setParentProcessInstanceId(parentInstance.getId());
- processMap.setParentTaskInstanceId(parentTask.getId());
- createWorkProcessInstanceMap(processMap);
- return processMap;
- }
-
- /**
- * find previous task work process map.
- *
- * @param parentProcessInstance parentProcessInstance
- * @param parentTask parentTask
- * @return process instance map
- */
- private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
- TaskInstance parentTask) {
-
- Integer preTaskId = 0;
- List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
- for (TaskInstance task : preTaskList) {
- if (task.getName().equals(parentTask.getName())) {
- preTaskId = task.getId();
- ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
- if (map != null) {
- return map;
- }
- }
- }
- logger.info("sub process instance is not found,parent task:{},parent instance:{}",
- parentTask.getId(), parentProcessInstance.getId());
- return null;
- }
-
- /**
- * create sub work process command
- *
- * @param parentProcessInstance parentProcessInstance
- * @param task task
- */
- public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
- if (!task.isSubProcess()) {
- return;
- }
- //check create sub work flow firstly
- ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
- if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
- // recover failover tolerance would not create a new command when the sub command already have been created
- return;
- }
- instanceMap = setProcessInstanceMap(parentProcessInstance, task);
- ProcessInstance childInstance = null;
- if (instanceMap.getProcessInstanceId() != 0) {
- childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
- }
- Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
- updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
- initSubInstanceState(childInstance);
- createCommand(subProcessCommand);
- logger.info("sub process command created: {} ", subProcessCommand);
- }
-
- /**
- * complement data needs transform parent parameter to child.
- */
- protected String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance, Map<String, String> fatherParams) {
- // set sub work process command
- String processMapStr = JSONUtils.toJsonString(instanceMap);
- Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
- if (parentProcessInstance.isComplementData()) {
- Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
- String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
- String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
- processMapStr = JSONUtils.toJsonString(cmdParam);
- }
- if (fatherParams.size() != 0) {
- cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
- processMapStr = JSONUtils.toJsonString(cmdParam);
- }
- return processMapStr;
- }
-
- public Map<String, String> getGlobalParamMap(String globalParams) {
- List<Property> propList;
- Map<String, String> globalParamMap = new HashMap<>();
- if (StringUtils.isNotEmpty(globalParams)) {
- propList = JSONUtils.toList(globalParams, Property.class);
- globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
- }
-
- return globalParamMap;
- }
-
- /**
- * create sub work process command
- */
- public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
- ProcessInstance childInstance,
- ProcessInstanceMap instanceMap,
- TaskInstance task) {
- CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
- Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
- long childDefineCode = 0L;
- if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) {
- childDefineCode = NumberUtils.toLong(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)));
- }
- ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode);
-
- Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
- List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
- Map<String, String> globalMap = this.getGlobalParamMap(task.getVarPool());
- Map<String, String> fatherParams = new HashMap<>();
- if (CollectionUtils.isNotEmpty(allParam)) {
- for (Property info : allParam) {
- if (Direct.OUT == info.getDirect()) {
- continue;
- }
- fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
- }
- }
- String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
- int subProcessInstanceId = childInstance == null ? 0 : childInstance.getId();
- return new Command(
- commandType,
- TaskDependType.TASK_POST,
- parentProcessInstance.getFailureStrategy(),
- parentProcessInstance.getExecutorId(),
- subProcessDefinition.getCode(),
- processParam,
- parentProcessInstance.getWarningType(),
- parentProcessInstance.getWarningGroupId(),
- parentProcessInstance.getScheduleTime(),
- task.getWorkerGroup(),
- task.getEnvironmentCode(),
- parentProcessInstance.getProcessInstancePriority(),
- parentProcessInstance.getDryRun(),
- subProcessInstanceId,
- subProcessDefinition.getVersion()
- );
- }
-
- /**
- * initialize sub work flow state
- * child instance state would be initialized when 'recovery from pause/stop/failure'
- */
- private void initSubInstanceState(ProcessInstance childInstance) {
- if (childInstance != null) {
- childInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
- updateProcessInstance(childInstance);
- }
- }
-
- /**
- * get sub work flow command type
- * child instance exist: child command = fatherCommand
- * child instance not exists: child command = fatherCommand[0]
- */
- private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
- CommandType commandType = parentProcessInstance.getCommandType();
- if (childInstance == null) {
- String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
- commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
- }
- return commandType;
- }
-
- /**
- * update sub process definition
- *
- * @param parentProcessInstance parentProcessInstance
- * @param childDefinitionCode childDefinitionId
- */
- private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
- ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
- parentProcessInstance.getProcessDefinitionVersion());
- ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
- if (childDefinition != null && fatherDefinition != null) {
- childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
- processDefineMapper.updateById(childDefinition);
- }
- }
-
- /**
- * submit task to mysql
- *
- * @param taskInstance taskInstance
- * @param processInstance processInstance
- * @return task instance
- */
- public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
- ExecutionStatus processInstanceState = processInstance.getState();
- if (processInstanceState.typeIsFinished()
- || processInstanceState == ExecutionStatus.READY_PAUSE
- || processInstanceState == ExecutionStatus.READY_STOP) {
- logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState);
- return null;
- }
- taskInstance.setExecutorId(processInstance.getExecutorId());
- taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
- if (taskInstance.getSubmitTime() == null) {
- taskInstance.setSubmitTime(new Date());
- }
- if (taskInstance.getFirstSubmitTime() == null) {
- taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
- }
- boolean saveResult = saveTaskInstance(taskInstance);
- if (!saveResult) {
- return null;
- }
- return taskInstance;
- }
-
- /**
- * get submit task instance state by the work process state
- * cannot modify the task state when running/kill/submit success, or this
- * task instance is already exists in task queue .
- * return pause if work process state is ready pause
- * return stop if work process state is ready stop
- * if all of above are not satisfied, return submit success
- *
- * @param taskInstance taskInstance
- * @param processInstance processInstance
- * @return process instance state
- */
- public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
- ExecutionStatus state = taskInstance.getState();
- // running, delayed or killed
- // the task already exists in task queue
- // return state
- if (
- state == ExecutionStatus.RUNNING_EXECUTION
- || state == ExecutionStatus.DELAY_EXECUTION
- || state == ExecutionStatus.KILL
- || state == ExecutionStatus.DISPATCH
- ) {
- return state;
- }
- //return pasue /stop if process instance state is ready pause / stop
- // or return submit success
- if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
- state = ExecutionStatus.PAUSE;
- } else if (processInstance.getState() == ExecutionStatus.READY_STOP
- || !checkProcessStrategy(taskInstance, processInstance)) {
- state = ExecutionStatus.KILL;
- } else {
- state = ExecutionStatus.SUBMITTED_SUCCESS;
- }
- return state;
- }
-
- /**
- * check process instance strategy
- *
- * @param taskInstance taskInstance
- * @return check strategy result
- */
- private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
- FailureStrategy failureStrategy = processInstance.getFailureStrategy();
- if (failureStrategy == FailureStrategy.CONTINUE) {
- return true;
- }
- List<TaskInstance> taskInstances = this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
-
- for (TaskInstance task : taskInstances) {
- if (task.getState() == ExecutionStatus.FAILURE
- && task.getRetryTimes() >= task.getMaxRetryTimes()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * insert or update work process instance to data base
- *
- * @param processInstance processInstance
- */
- public void saveProcessInstance(ProcessInstance processInstance) {
- if (processInstance == null) {
- logger.error("save error, process instance is null!");
- return;
- }
- if (processInstance.getId() != 0) {
- processInstanceMapper.updateById(processInstance);
- } else {
- processInstanceMapper.insert(processInstance);
- }
- }
-
- /**
- * insert or update command
- *
- * @param command command
- * @return save command result
- */
- public int saveCommand(Command command) {
- if (command.getId() != 0) {
- return commandMapper.updateById(command);
- } else {
- return commandMapper.insert(command);
- }
- }
-
- /**
- * insert or update task instance
- *
- * @param taskInstance taskInstance
- * @return save task instance result
- */
- public boolean saveTaskInstance(TaskInstance taskInstance) {
- if (taskInstance.getId() != 0) {
- return updateTaskInstance(taskInstance);
- } else {
- return createTaskInstance(taskInstance);
- }
- }
-
- /**
- * insert task instance
- *
- * @param taskInstance taskInstance
- * @return create task instance result
- */
- public boolean createTaskInstance(TaskInstance taskInstance) {
- int count = taskInstanceMapper.insert(taskInstance);
- return count > 0;
- }
-
- /**
- * update task instance
- *
- * @param taskInstance taskInstance
- * @return update task instance result
- */
- public boolean updateTaskInstance(TaskInstance taskInstance) {
- int count = taskInstanceMapper.updateById(taskInstance);
- return count > 0;
- }
-
- /**
- * find task instance by id
- *
- * @param taskId task id
- * @return task instance
- */
- public TaskInstance findTaskInstanceById(Integer taskId) {
- return taskInstanceMapper.selectById(taskId);
- }
-
- /**
- * find task instance list by id list
- *
- * @param idList task id list
- * @return task instance list
- */
- public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
- if (CollectionUtils.isEmpty(idList)) {
- return new ArrayList<>();
- }
- return taskInstanceMapper.selectBatchIds(idList);
- }
-
- /**
- * package task instance
- */
- public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
- taskInstance.setProcessInstance(processInstance);
- taskInstance.setProcessDefine(processInstance.getProcessDefinition());
- TaskDefinition taskDefinition = this.findTaskDefinition(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion());
- this.updateTaskDefinitionResources(taskDefinition);
- taskInstance.setTaskDefine(taskDefinition);
- }
-
- /**
- * Update {@link ResourceInfo} information in {@link TaskDefinition}
- *
- * @param taskDefinition the given {@link TaskDefinition}
- */
- public void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
- Map<String, Object> taskParameters = JSONUtils.parseObject(
- taskDefinition.getTaskParams(),
- new TypeReference<Map<String, Object>>() {
- });
- if (taskParameters != null) {
- // if contains mainJar field, query resource from database
- // Flink, Spark, MR
- if (taskParameters.containsKey("mainJar")) {
- Object mainJarObj = taskParameters.get("mainJar");
- ResourceInfo mainJar = JSONUtils.parseObject(
- JSONUtils.toJsonString(mainJarObj),
- ResourceInfo.class);
- ResourceInfo resourceInfo = updateResourceInfo(mainJar);
- if (resourceInfo != null) {
- taskParameters.put("mainJar", resourceInfo);
- }
- }
- // update resourceList information
- if (taskParameters.containsKey("resourceList")) {
- String resourceListStr = JSONUtils.toJsonString(taskParameters.get("resourceList"));
- List<ResourceInfo> resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class);
- List<ResourceInfo> updatedResourceInfos = resourceInfos
- .stream()
- .map(this::updateResourceInfo)
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
- taskParameters.put("resourceList", updatedResourceInfos);
- }
- // set task parameters
- taskDefinition.setTaskParams(JSONUtils.toJsonString(taskParameters));
- }
- }
-
- /**
- * update {@link ResourceInfo} by given original ResourceInfo
- *
- * @param res origin resource info
- * @return {@link ResourceInfo}
- */
- private ResourceInfo updateResourceInfo(ResourceInfo res) {
- ResourceInfo resourceInfo = null;
- // only if mainJar is not null and does not contains "resourceName" field
- if (res != null) {
- int resourceId = res.getId();
- if (resourceId <= 0) {
- logger.error("invalid resourceId, {}", resourceId);
- return null;
- }
- resourceInfo = new ResourceInfo();
- // get resource from database, only one resource should be returned
- Resource resource = getResourceById(resourceId);
- resourceInfo.setId(resourceId);
- resourceInfo.setRes(resource.getFileName());
- resourceInfo.setResourceName(resource.getFullName());
- if (logger.isInfoEnabled()) {
- logger.info("updated resource info {}",
- JSONUtils.toJsonString(resourceInfo));
- }
- }
- return resourceInfo;
- }
-
- /**
- * get id list by task state
- *
- * @param instanceId instanceId
- * @param state state
- * @return task instance states
- */
- public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
- return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
- }
-
- /**
- * find valid task list by process definition id
- *
- * @param processInstanceId processInstanceId
- * @return task instance list
- */
- public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId) {
- return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
- }
-
- /**
- * find previous task list by work process id
- *
- * @param processInstanceId processInstanceId
- * @return task instance list
- */
- public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
- return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO);
- }
-
- /**
- * update work process instance map
- *
- * @param processInstanceMap processInstanceMap
- * @return update process instance result
- */
- public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
- return processInstanceMapMapper.updateById(processInstanceMap);
- }
-
- /**
- * create work process instance map
- *
- * @param processInstanceMap processInstanceMap
- * @return create process instance result
- */
- public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
- int count = 0;
- if (processInstanceMap != null) {
- return processInstanceMapMapper.insert(processInstanceMap);
- }
- return count;
- }
-
- /**
- * find work process map by parent process id and parent task id.
- *
- * @param parentWorkProcessId parentWorkProcessId
- * @param parentTaskId parentTaskId
- * @return process instance map
- */
- public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
- return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
- }
-
- /**
- * delete work process map by parent process id
- *
- * @param parentWorkProcessId parentWorkProcessId
- * @return delete process map result
- */
- public int deleteWorkProcessMapByParentId(int parentWorkProcessId) {
- return processInstanceMapMapper.deleteByParentProcessId(parentWorkProcessId);
-
- }
-
- /**
- * find sub process instance
- *
- * @param parentProcessId parentProcessId
- * @param parentTaskId parentTaskId
- * @return process instance
- */
- public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
- ProcessInstance processInstance = null;
- ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryByParentId(parentProcessId, parentTaskId);
- if (processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0) {
- return processInstance;
- }
- processInstance = findProcessInstanceById(processInstanceMap.getProcessInstanceId());
- return processInstance;
- }
-
- /**
- * find parent process instance
- *
- * @param subProcessId subProcessId
- * @return process instance
- */
- public ProcessInstance findParentProcessInstance(Integer subProcessId) {
- ProcessInstance processInstance = null;
- ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(subProcessId);
- if (processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0) {
- return processInstance;
- }
- processInstance = findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
- return processInstance;
- }
-
- /**
- * update process instance
- *
- * @param processInstance processInstance
- * @return update process instance result
- */
- public int updateProcessInstance(ProcessInstance processInstance) {
- return processInstanceMapper.updateById(processInstance);
- }
-
- /**
- * for show in page of taskInstance
- */
- public void changeOutParam(TaskInstance taskInstance) {
- if (StringUtils.isEmpty(taskInstance.getVarPool())) {
- return;
- }
- List<Property> properties = JSONUtils.toList(taskInstance.getVarPool(), Property.class);
- if (CollectionUtils.isEmpty(properties)) {
- return;
- }
- //if the result more than one line,just get the first .
- Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
- });
- Object localParams = taskParams.get(LOCAL_PARAMS);
- if (localParams == null) {
- return;
- }
- List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
- Map<String, String> outProperty = new HashMap<>();
- for (Property info : properties) {
- if (info.getDirect() == Direct.OUT) {
- outProperty.put(info.getProp(), info.getValue());
- }
- }
- for (Property info : allParam) {
- if (info.getDirect() == Direct.OUT) {
- String paramName = info.getProp();
- info.setValue(outProperty.get(paramName));
- }
- }
- taskParams.put(LOCAL_PARAMS, allParam);
- taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
- }
-
- /**
- * convert integer list to string list
- *
- * @param intList intList
- * @return string list
- */
- public List<String> convertIntListToString(List<Integer> intList) {
- if (intList == null) {
- return new ArrayList<>();
- }
- List<String> result = new ArrayList<>(intList.size());
- for (Integer intVar : intList) {
- result.add(String.valueOf(intVar));
- }
- return result;
- }
-
- /**
- * query schedule by id
- *
- * @param id id
- * @return schedule
- */
- public Schedule querySchedule(int id) {
- return scheduleMapper.selectById(id);
- }
-
- /**
- * query Schedule by processDefinitionCode
- *
- * @param processDefinitionCode processDefinitionCode
- * @see Schedule
- */
- public List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode) {
- return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
- }
-
- /**
- * query Schedule by processDefinitionCode
- *
- * @param processDefinitionCodeList processDefinitionCodeList
- * @see Schedule
- */
- public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
- List<Schedule> processDefinitionScheduleList = scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
- return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
- Schedule::getWorkerGroup));
- }
-
- /**
- * query dependent process definition by process definition code
- *
- * @param processDefinitionCode processDefinitionCode
- * @see DependentProcessDefinition
- */
- public List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode) {
- return workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
- }
-
- /**
- * query need failover process instance
- *
- * @param host host
- * @return process instance list
- */
- public List<ProcessInstance> queryNeedFailoverProcessInstances(String host) {
- return processInstanceMapper.queryByHostAndStatus(host, stateArray);
- }
-
- public List<String> queryNeedFailoverProcessInstanceHost() {
- return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
- }
-
- /**
- * process need failover process instance
- *
- * @param processInstance processInstance
- */
@Transactional(rollbackFor = RuntimeException.class)
- public void processNeedFailoverProcessInstances(ProcessInstance processInstance) {
- //1 update processInstance host is null
- processInstance.setHost(Constants.NULL);
- processInstanceMapper.updateById(processInstance);
-
- ProcessDefinition processDefinition = findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
-
- //2 insert into recover command
- Command cmd = new Command();
- cmd.setProcessDefinitionCode(processDefinition.getCode());
- cmd.setProcessDefinitionVersion(processDefinition.getVersion());
- cmd.setProcessInstanceId(processInstance.getId());
- cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
- cmd.setExecutorId(processInstance.getExecutorId());
- cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
- createCommand(cmd);
- }
-
- /**
- * query all need failover task instances by host
- *
- * @param host host
- * @return task instance list
- */
- public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
- return taskInstanceMapper.queryByHostAndStatus(host,
- stateArray);
- }
-
- /**
- * find data source by id
- *
- * @param id id
- * @return datasource
- */
- public DataSource findDataSourceById(int id) {
- return dataSourceMapper.selectById(id);
- }
-
- /**
- * update process instance state by id
- *
- * @param processInstanceId processInstanceId
- * @param executionStatus executionStatus
- * @return update process result
- */
- public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
- ProcessInstance instance = processInstanceMapper.selectById(processInstanceId);
- instance.setState(executionStatus);
- return processInstanceMapper.updateById(instance);
- }
-
- /**
- * find process instance by the task id
- *
- * @param taskId taskId
- * @return process instance
- */
- public ProcessInstance findProcessInstanceByTaskId(int taskId) {
- TaskInstance taskInstance = taskInstanceMapper.selectById(taskId);
- if (taskInstance != null) {
- return processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
- }
- return null;
- }
-
- /**
- * find udf function list by id list string
- *
- * @param ids ids
- * @return udf function list
- */
- public List<UdfFunc> queryUdfFunListByIds(Integer[] ids) {
- return udfFuncMapper.queryUdfByIdStr(ids, null);
- }
-
- /**
- * find tenant code by resource name
- *
- * @param resName resource name
- * @param resourceType resource type
- * @return tenant code
- */
- public String queryTenantCodeByResName(String resName, ResourceType resourceType) {
- // in order to query tenant code successful although the version is older
- String fullName = resName.startsWith("/") ? resName : String.format("/%s", resName);
-
- List<Resource> resourceList = resourceMapper.queryResource(fullName, resourceType.ordinal());
- if (CollectionUtils.isEmpty(resourceList)) {
- return StringUtils.EMPTY;
- }
- int userId = resourceList.get(0).getUserId();
- User user = userMapper.selectById(userId);
- if (Objects.isNull(user)) {
- return StringUtils.EMPTY;
- }
- Tenant tenant = tenantMapper.queryById(user.getTenantId());
- if (Objects.isNull(tenant)) {
- return StringUtils.EMPTY;
- }
- return tenant.getTenantCode();
- }
-
- /**
- * find schedule list by process define codes.
- *
- * @param codes codes
- * @return schedule list
- */
- public List<Schedule> selectAllByProcessDefineCode(long[] codes) {
- return scheduleMapper.selectAllByProcessDefineArray(codes);
- }
-
- /**
- * find last scheduler process instance in the date interval
- *
- * @param definitionCode definitionCode
- * @param dateInterval dateInterval
- * @return process instance
- */
- public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
- return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
- }
-
- /**
- * find last manual process instance interval
- *
- * @param definitionCode process definition code
- * @param dateInterval dateInterval
- * @return process instance
- */
- public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
- return processInstanceMapper.queryLastManualProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
- }
-
- /**
- * find last running process instance
- *
- * @param definitionCode process definition code
- * @param startTime start time
- * @param endTime end time
- * @return process instance
- */
- public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
- return processInstanceMapper.queryLastRunningProcess(definitionCode,
- startTime,
- endTime,
- stateArray);
- }
-
- /**
- * query user queue by process instance
- *
- * @param processInstance processInstance
- * @return queue
- */
- public String queryUserQueueByProcessInstance(ProcessInstance processInstance) {
-
- String queue = "";
- if (processInstance == null) {
- return queue;
- }
- User executor = userMapper.selectById(processInstance.getExecutorId());
- if (executor != null) {
- queue = executor.getQueue();
- }
- return queue;
- }
-
- /**
- * query project name and user name by processInstanceId.
- *
- * @param processInstanceId processInstanceId
- * @return projectName and userName
- */
- public ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId) {
- return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId);
- }
-
- /**
- * get task worker group
- *
- * @param taskInstance taskInstance
- * @return workerGroupId
- */
- public String getTaskWorkerGroup(TaskInstance taskInstance) {
- String workerGroup = taskInstance.getWorkerGroup();
-
- if (StringUtils.isNotBlank(workerGroup)) {
- return workerGroup;
- }
- int processInstanceId = taskInstance.getProcessInstanceId();
- ProcessInstance processInstance = findProcessInstanceById(processInstanceId);
-
- if (processInstance != null) {
- return processInstance.getWorkerGroup();
- }
- logger.info("task : {} will use default worker group", taskInstance.getId());
- return Constants.DEFAULT_WORKER_GROUP;
- }
-
- /**
- * get have perm project list
- *
- * @param userId userId
- * @return project list
- */
- public List<Project> getProjectListHavePerm(int userId) {
- List<Project> createProjects = projectMapper.queryProjectCreatedByUser(userId);
- List<Project> authedProjects = projectMapper.queryAuthedProjectListByUserId(userId);
-
- if (createProjects == null) {
- createProjects = new ArrayList<>();
- }
-
- if (authedProjects != null) {
- createProjects.addAll(authedProjects);
- }
- return createProjects;
- }
-
- /**
- * list unauthorized udf function
- *
- * @param userId user id
- * @param needChecks data source id array
- * @return unauthorized udf function list
- */
- public <T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType) {
- List<T> resultList = new ArrayList<>();
-
- if (Objects.nonNull(needChecks) && needChecks.length > 0) {
- Set<T> originResSet = new HashSet<>(Arrays.asList(needChecks));
-
- switch (authorizationType) {
- case RESOURCE_FILE_ID:
- case UDF_FILE:
- List<Resource> ownUdfResources = resourceMapper.listAuthorizedResourceById(userId, needChecks);
- addAuthorizedResources(ownUdfResources, userId);
- Set<Integer> authorizedResourceFiles = ownUdfResources.stream().map(Resource::getId).collect(toSet());
- originResSet.removeAll(authorizedResourceFiles);
- break;
- case RESOURCE_FILE_NAME:
- List<Resource> ownResources = resourceMapper.listAuthorizedResource(userId, needChecks);
- addAuthorizedResources(ownResources, userId);
- Set<String> authorizedResources = ownResources.stream().map(Resource::getFullName).collect(toSet());
- originResSet.removeAll(authorizedResources);
- break;
- case DATASOURCE:
- Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(DataSource::getId).collect(toSet());
- originResSet.removeAll(authorizedDatasources);
- break;
- case UDF:
- Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(UdfFunc::getId).collect(toSet());
- originResSet.removeAll(authorizedUdfs);
- break;
- default:
- break;
- }
-
- resultList.addAll(originResSet);
- }
-
- return resultList;
- }
-
- /**
- * get user by user id
- *
- * @param userId user id
- * @return User
- */
- public User getUserById(int userId) {
- return userMapper.selectById(userId);
- }
-
- /**
- * get resource by resource id
- *
- * @param resourceId resource id
- * @return Resource
- */
- public Resource getResourceById(int resourceId) {
- return resourceMapper.selectById(resourceId);
- }
-
- /**
- * list resources by ids
- *
- * @param resIds resIds
- * @return resource list
- */
- public List<Resource> listResourceByIds(Integer[] resIds) {
- return resourceMapper.listResourceByIds(resIds);
- }
-
- /**
- * format task app id in task instance
- */
- public String formatTaskAppId(TaskInstance taskInstance) {
- ProcessInstance processInstance = findProcessInstanceById(taskInstance.getProcessInstanceId());
- if (processInstance == null) {
- return "";
- }
- ProcessDefinition definition = findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
- if (definition == null) {
- return "";
- }
- return String.format("%s_%s_%s", definition.getId(), processInstance.getId(), taskInstance.getId());
- }
-
- /**
- * switch process definition version to process definition log version
- */
- public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
- if (null == processDefinition || null == processDefinitionLog) {
- return Constants.DEFINITION_FAILURE;
- }
- processDefinitionLog.setId(processDefinition.getId());
- processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
- processDefinitionLog.setFlag(Flag.YES);
-
- int result = processDefineMapper.updateById(processDefinitionLog);
- if (result > 0) {
- result = switchProcessTaskRelationVersion(processDefinitionLog);
- if (result <= 0) {
- return Constants.EXIT_CODE_FAILURE;
- }
- }
- return result;
- }
-
- public int switchProcessTaskRelationVersion(ProcessDefinition processDefinition) {
- List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
- if (!processTaskRelationList.isEmpty()) {
- processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
- }
- List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
- int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
- if (batchInsert == 0) {
- return Constants.EXIT_CODE_FAILURE;
- } else {
- int result = 0;
- for (ProcessTaskRelationLog taskRelationLog : processTaskRelationLogList) {
- int switchResult = switchTaskDefinitionVersion(taskRelationLog.getPostTaskCode(), taskRelationLog.getPostTaskVersion());
- if (switchResult != Constants.EXIT_CODE_FAILURE) {
- result++;
- }
- }
- return result;
- }
- }
-
- public int switchTaskDefinitionVersion(long taskCode, int taskVersion) {
- TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
- if (taskDefinition == null) {
- return Constants.EXIT_CODE_FAILURE;
- }
- if (taskDefinition.getVersion() == taskVersion) {
- return Constants.EXIT_CODE_SUCCESS;
- }
- TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskVersion);
- if (taskDefinitionUpdate == null) {
- return Constants.EXIT_CODE_FAILURE;
- }
- taskDefinitionUpdate.setUpdateTime(new Date());
- taskDefinitionUpdate.setId(taskDefinition.getId());
- return taskDefinitionMapper.updateById(taskDefinitionUpdate);
- }
-
- /**
- * get resource ids
- *
- * @param taskDefinition taskDefinition
- * @return resource ids
- */
- public String getResourceIds(TaskDefinition taskDefinition) {
- Set<Integer> resourceIds = null;
- AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder().taskType(taskDefinition.getTaskType()).taskParams(taskDefinition.getTaskParams()).build());
-
- if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
- resourceIds = params.getResourceFilesList().
- stream()
- .filter(t -> t.getId() != 0)
- .map(ResourceInfo::getId)
- .collect(Collectors.toSet());
- }
- if (CollectionUtils.isEmpty(resourceIds)) {
- return StringUtils.EMPTY;
- }
- return StringUtils.join(resourceIds, ",");
- }
-
- public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine) {
- Date now = new Date();
- List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
- List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
- for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
- taskDefinitionLog.setProjectCode(projectCode);
- taskDefinitionLog.setUpdateTime(now);
- taskDefinitionLog.setOperateTime(now);
- taskDefinitionLog.setOperator(operator.getId());
- taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
- if (taskDefinitionLog.getCode() == 0) {
- try {
- taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
- } catch (CodeGenerateException e) {
- logger.error("Task code get error, ", e);
- return Constants.DEFINITION_FAILURE;
- }
- }
- if (taskDefinitionLog.getVersion() == 0) {
- // init first version
- taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
- }
-
- TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
- .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
- if (definitionCodeAndVersion == null) {
- taskDefinitionLog.setUserId(operator.getId());
- taskDefinitionLog.setCreateTime(now);
- newTaskDefinitionLogs.add(taskDefinitionLog);
- continue;
- }
- if (taskDefinitionLog.equals(definitionCodeAndVersion)) {
- // do nothing if equals
- continue;
- }
- taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
- Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
- taskDefinitionLog.setVersion(version + 1);
- taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
- updateTaskDefinitionLogs.add(taskDefinitionLog);
- }
- int insertResult = 0;
- int updateResult = 0;
- for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
- TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
- if (task == null) {
- newTaskDefinitionLogs.add(taskDefinitionToUpdate);
- } else {
- insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
- if (Boolean.TRUE.equals(syncDefine)) {
- taskDefinitionToUpdate.setId(task.getId());
- updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
- } else {
- updateResult++;
- }
- }
- }
- if (!newTaskDefinitionLogs.isEmpty()) {
- insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
- if (Boolean.TRUE.equals(syncDefine)) {
- updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
- } else {
- updateResult += newTaskDefinitionLogs.size();
- }
- }
- return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
- }
-
- /**
- * save processDefinition (including create or update processDefinition)
- */
- public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) {
- ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
- Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
- int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
- processDefinitionLog.setVersion(insertVersion);
- processDefinitionLog.setReleaseState(!isFromProcessDefine || processDefinitionLog.getReleaseState() == ReleaseState.ONLINE ? ReleaseState.ONLINE : ReleaseState.OFFLINE);
- processDefinitionLog.setOperator(operator.getId());
- processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
- int insertLog = processDefineLogMapper.insert(processDefinitionLog);
- int result = 1;
- if (Boolean.TRUE.equals(syncDefine)) {
- if (0 == processDefinition.getId()) {
- result = processDefineMapper.insert(processDefinitionLog);
- } else {
- processDefinitionLog.setId(processDefinition.getId());
- result = processDefineMapper.updateById(processDefinitionLog);
- }
- }
- return (insertLog & result) > 0 ? insertVersion : 0;
- }
-
- /**
- * save task relations
- */
- public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
- List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs,
- Boolean syncDefine) {
- if (taskRelationList.isEmpty()) {
- return Constants.EXIT_CODE_SUCCESS;
- }
- Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
- if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
- taskDefinitionLogMap = taskDefinitionLogs.stream()
- .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
- }
- Date now = new Date();
- for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
- processTaskRelationLog.setProjectCode(projectCode);
- processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
- processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
- if (taskDefinitionLogMap != null) {
- TaskDefinitionLog preTaskDefinitionLog = taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
- if (preTaskDefinitionLog != null) {
- processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
- }
- TaskDefinitionLog postTaskDefinitionLog = taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
- if (postTaskDefinitionLog != null) {
- processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
- }
- }
- processTaskRelationLog.setCreateTime(now);
- processTaskRelationLog.setUpdateTime(now);
- processTaskRelationLog.setOperator(operator.getId());
- processTaskRelationLog.setOperateTime(now);
- }
- int insert = taskRelationList.size();
- if (Boolean.TRUE.equals(syncDefine)) {
- List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
- if (!processTaskRelationList.isEmpty()) {
- Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
- Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
- boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
- if (result) {
- return Constants.EXIT_CODE_SUCCESS;
- }
- processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
- }
- insert = processTaskRelationMapper.batchInsert(taskRelationList);
- }
- int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
- return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
- }
-
- public boolean isTaskOnline(long taskCode) {
- List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
- if (!processTaskRelationList.isEmpty()) {
- Set<Long> processDefinitionCodes = processTaskRelationList
- .stream()
- .map(ProcessTaskRelation::getProcessDefinitionCode)
- .collect(Collectors.toSet());
- List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
- // check process definition is already online
- for (ProcessDefinition processDefinition : processDefinitionList) {
- if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Generate the DAG Graph based on the process definition id
- * Use temporarily before refactoring taskNode
- *
- * @param processDefinition process definition
- * @return dag graph
- */
- public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
- List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
- List<TaskNode> taskNodeList = transformTask(taskRelations, Lists.newArrayList());
- ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(taskRelations));
- // Generate concrete Dag to be executed
- return DagHelper.buildDagGraph(processDag);
- }
-
- /**
- * generate DagData
- */
- public DagData genDagData(ProcessDefinition processDefinition) {
- List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
- List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(taskRelations);
- List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList());
- return new DagData(processDefinition, taskRelations, taskDefinitions);
- }
-
- public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
- Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
- for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
- if (processTaskRelation.getPreTaskCode() > 0) {
- taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()));
- }
- if (processTaskRelation.getPostTaskCode() > 0) {
- taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
- }
- }
- if (taskDefinitionSet.isEmpty()) {
- return Lists.newArrayList();
- }
- return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
- }
-
- public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
- List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
- Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
- for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
- if (processTaskRelation.getPreTaskCode() > 0) {
- taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
- }
- if (processTaskRelation.getPostTaskCode() > 0) {
- taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
- }
- }
- taskCodeVersionMap.forEach((code, version) -> {
- taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(code, version));
- });
- return taskDefinitionLogs;
- }
-
- /**
- * find task definition by code and version
- */
- public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
- return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
- }
-
- /**
- * find process task relation list by process
- */
- public List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion) {
- List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinitionCode, processDefinitionVersion);
- return processTaskRelationLogList.stream().map(r -> (ProcessTaskRelation) r).collect(Collectors.toList());
- }
-
- /**
- * add authorized resources
- *
- * @param ownResources own resources
- * @param userId userId
- */
- private void addAuthorizedResources(List<Resource> ownResources, int userId) {
- List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);
- List<Resource> relationResources = CollectionUtils.isNotEmpty(relationResourceIds) ? resourceMapper.queryResourceListById(relationResourceIds) : new ArrayList<>();
- ownResources.addAll(relationResources);
- }
-
- /**
- * Use temporarily before refactoring taskNode
- */
- public List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
- Map<Long, List<Long>> taskCodeMap = new HashMap<>();
- for (ProcessTaskRelation processTaskRelation : taskRelationList) {
- taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
- if (v == null) {
- v = new ArrayList<>();
- }
- if (processTaskRelation.getPreTaskCode() != 0L) {
- v.add(processTaskRelation.getPreTaskCode());
- }
- return v;
- });
- }
- if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
- taskDefinitionLogs = genTaskDefineList(taskRelationList);
- }
- Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
- .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
- List<TaskNode> taskNodeList = new ArrayList<>();
- for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
- TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey());
- if (taskDefinitionLog != null) {
- TaskNode taskNode = new TaskNode();
- taskNode.setCode(taskDefinitionLog.getCode());
- taskNode.setVersion(taskDefinitionLog.getVersion());
- taskNode.setName(taskDefinitionLog.getName());
- taskNode.setDesc(taskDefinitionLog.getDescription());
- taskNode.setType(taskDefinitionLog.getTaskType().toUpperCase());
- taskNode.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
- taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
- taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
- Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
- taskNode.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
- taskNode.setSwitchResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT)));
- taskNode.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE)));
- taskParamsMap.remove(Constants.CONDITION_RESULT);
- taskParamsMap.remove(Constants.DEPENDENCE);
- taskNode.setParams(JSONUtils.toJsonString(taskParamsMap));
- taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
- taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
- taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode());
- taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
- taskDefinitionLog.getTimeoutNotifyStrategy(),
- taskDefinitionLog.getTimeout())));
- taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
- taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
- taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId());
- taskNode.setTaskGroupPriority(taskDefinitionLog.getTaskGroupPriority());
- taskNodeList.add(taskNode);
- }
- }
- return taskNodeList;
- }
-
- public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId) {
- HashMap<ProcessInstance, TaskInstance> processTaskMap = new HashMap<>();
- //find sub tasks
- ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(processId);
- if (processInstanceMap == null) {
- return processTaskMap;
- }
- ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
- TaskInstance fatherTask = this.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
-
- if (fatherProcess != null) {
- processTaskMap.put(fatherProcess, fatherTask);
- }
- return processTaskMap;
- }
-
- public DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId) {
- return dqExecuteResultMapper.getExecuteResultById(taskInstanceId);
- }
-
- public int updateDqExecuteResultUserId(int taskInstanceId) {
- DqExecuteResult dqExecuteResult =
- dqExecuteResultMapper.selectOne(new QueryWrapper<DqExecuteResult>().eq(TASK_INSTANCE_ID, taskInstanceId));
- if (dqExecuteResult == null) {
- return -1;
- }
-
- ProcessInstance processInstance = processInstanceMapper.selectById(dqExecuteResult.getProcessInstanceId());
- if (processInstance == null) {
- return -1;
- }
-
- ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
- if (processDefinition == null) {
- return -1;
- }
-
- dqExecuteResult.setProcessDefinitionId(processDefinition.getId());
- dqExecuteResult.setUserId(processDefinition.getUserId());
- dqExecuteResult.setState(DqTaskState.DEFAULT.getCode());
- return dqExecuteResultMapper.updateById(dqExecuteResult);
- }
-
- public int updateDqExecuteResultState(DqExecuteResult dqExecuteResult) {
- return dqExecuteResultMapper.updateById(dqExecuteResult);
- }
-
- public int deleteDqExecuteResultByTaskInstanceId(int taskInstanceId) {
- return dqExecuteResultMapper.delete(
- new QueryWrapper<DqExecuteResult>()
- .eq(TASK_INSTANCE_ID, taskInstanceId));
- }
-
- public int deleteTaskStatisticsValueByTaskInstanceId(int taskInstanceId) {
- return dqTaskStatisticsValueMapper.delete(
- new QueryWrapper<DqTaskStatisticsValue>()
- .eq(TASK_INSTANCE_ID, taskInstanceId));
- }
-
- public DqRule getDqRule(int ruleId) {
- return dqRuleMapper.selectById(ruleId);
- }
-
- public List<DqRuleInputEntry> getRuleInputEntry(int ruleId) {
- return DqRuleUtils.transformInputEntry(dqRuleInputEntryMapper.getRuleInputEntryList(ruleId));
- }
-
- public List<DqRuleExecuteSql> getDqExecuteSql(int ruleId) {
- return dqRuleExecuteSqlMapper.getExecuteSqlList(ruleId);
- }
-
- public DqComparisonType getComparisonTypeById(int id) {
- return dqComparisonTypeMapper.selectById(id);
- }
-
- /**
- * the first time (when submit the task ) get the resource of the task group
- *
- * @param taskId task id
- */
- public boolean acquireTaskGroup(int taskId,
- String taskName, int groupId,
- int processId, int priority) {
- TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
- if (taskGroup == null) {
- return true;
- }
- // if task group is not applicable
- if (taskGroup.getStatus() == Flag.NO.getCode()) {
- return true;
- }
- TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
- if (taskGroupQueue == null) {
- taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
- } else {
- if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
- return true;
- }
- taskGroupQueue.setInQueue(Flag.NO.getCode());
- taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
- this.taskGroupQueueMapper.updateById(taskGroupQueue);
- }
- //check priority
- List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
- if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
- this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
- return false;
- }
- //try to get taskGroup
- int count = taskGroupMapper.selectAvailableCountById(groupId);
- if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
- return true;
- }
- this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
- return false;
- }
-
- /**
- * try to get the task group resource(when other task release the resource)
- */
- public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
- TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
- int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode());
- if (affectedCount > 0) {
- taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
- this.taskGroupQueueMapper.updateById(taskGroupQueue);
- this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
- return true;
- }
- return false;
- }
-
- public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
- return robTaskGroupResouce(taskGroupQueue);
- }
-
- public void releaseAllTaskGroup(int processInstanceId) {
- List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
- for (TaskInstance info : taskInstances) {
- releaseTaskGroup(info);
- }
- }
-
- /**
- * release the TGQ resource when the corresponding task is finished.
- *
- * @return the result code and msg
- */
- public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
-
- TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
- if (taskGroup == null) {
- return null;
- }
- TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
- if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
- return null;
- }
- try {
- while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
- , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
- thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
- if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
- return null;
- }
- taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
- }
- } catch (Exception e) {
- logger.error("release the task group error", e);
- }
- logger.info("updateTask:{}", taskInstance.getName());
- changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
- TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
- if (taskGroupQueue == null) {
- return null;
- }
- while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) {
- taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
- if (taskGroupQueue == null) {
- return null;
- }
- }
- return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
- }
-
- /**
- * release the TGQ resource when the corresponding task is finished.
- *
- * @param taskId task id
- * @return the result code and msg
- */
-
- public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) {
- TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId);
- taskGroupQueue.setStatus(status);
- taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis()));
- taskGroupQueueMapper.updateById(taskGroupQueue);
- }
-
- /**
- * insert into task group queue
- *
- * @param taskId task id
- * @param taskName task name
- * @param groupId group id
- * @param processId process id
- * @param priority priority
- * @return result and msg code
- */
- public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
- String taskName, Integer groupId,
- Integer processId, Integer priority, TaskGroupQueueStatus status) {
- TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName, groupId, processId, priority, status);
- taskGroupQueue.setCreateTime(new Date());
- taskGroupQueue.setUpdateTime(new Date());
- taskGroupQueueMapper.insert(taskGroupQueue);
- return taskGroupQueue;
- }
-
- public int updateTaskGroupQueueStatus(Integer taskId, int status) {
- return taskGroupQueueMapper.updateStatusByTaskId(taskId, status);
- }
-
- public int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue) {
- return taskGroupQueueMapper.updateById(taskGroupQueue);
- }
-
- public TaskGroupQueue loadTaskGroupQueue(int taskId) {
- return this.taskGroupQueueMapper.queryByTaskId(taskId);
- }
-
- public void sendStartTask2Master(ProcessInstance processInstance, int taskId,
- org.apache.dolphinscheduler.remote.command.CommandType taskType) {
- String host = processInstance.getHost();
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
- TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand(
- processInstance.getId(), taskId
- );
- stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType));
- }
-
- public ProcessInstance loadNextProcess4Serial(long code, int state) {
- return this.processInstanceMapper.loadNextProcess4Serial(code, state);
- }
-
- protected void deleteCommandWithCheck(int commandId) {
- int delete = this.commandMapper.deleteById(commandId);
- if (delete != 1) {
- throw new ServiceException("delete command fail, id:" + commandId);
- }
- }
+ void processNeedFailoverProcessInstances(ProcessInstance processInstance);
+
+ List<TaskInstance> queryNeedFailoverTaskInstances(String host);
+
+ DataSource findDataSourceById(int id);
+
+ int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus);
+
+ ProcessInstance findProcessInstanceByTaskId(int taskId);
+
+ List<UdfFunc> queryUdfFunListByIds(Integer[] ids);
+
+ String queryTenantCodeByResName(String resName, ResourceType resourceType);
+
+ List<Schedule> selectAllByProcessDefineCode(long[] codes);
+
+ ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval);
+
+ ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval);
+
+ ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime);
+
+ String queryUserQueueByProcessInstance(ProcessInstance processInstance);
+
+ ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);
+
+ String getTaskWorkerGroup(TaskInstance taskInstance);
+
+ List<Project> getProjectListHavePerm(int userId);
+
+ <T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType);
+
+ User getUserById(int userId);
+
+ Resource getResourceById(int resourceId);
+
+ List<Resource> listResourceByIds(Integer[] resIds);
+
+ String formatTaskAppId(TaskInstance taskInstance);
+
+ int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog);
+
+ int switchProcessTaskRelationVersion(ProcessDefinition processDefinition);
+
+ int switchTaskDefinitionVersion(long taskCode, int taskVersion);
+
+ String getResourceIds(TaskDefinition taskDefinition);
+
+ int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine);
+
+ int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine);
+
+ int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
+ List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs,
+ Boolean syncDefine);
+
+ boolean isTaskOnline(long taskCode);
+
+ DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition);
+
+ DagData genDagData(ProcessDefinition processDefinition);
+
+ List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations);
+
+ List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
+
+ TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
+
+ List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion);
+
+ List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs);
+
+ Map<ProcessInstance, TaskInstance> notifyProcessList(int processId);
+
+ DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId);
+
+ int updateDqExecuteResultUserId(int taskInstanceId);
+
+ int updateDqExecuteResultState(DqExecuteResult dqExecuteResult);
+
+ int deleteDqExecuteResultByTaskInstanceId(int taskInstanceId);
+
+ int deleteTaskStatisticsValueByTaskInstanceId(int taskInstanceId);
+
+ DqRule getDqRule(int ruleId);
+
+ List<DqRuleInputEntry> getRuleInputEntry(int ruleId);
+
+ List<DqRuleExecuteSql> getDqExecuteSql(int ruleId);
+
+ DqComparisonType getComparisonTypeById(int id);
+
+ boolean acquireTaskGroup(int taskId,
+ String taskName, int groupId,
+ int processId, int priority);
+
+ boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue);
+
+ boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue);
+
+ void releaseAllTaskGroup(int processInstanceId);
+
+ TaskInstance releaseTaskGroup(TaskInstance taskInstance);
+
+ void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status);
+
+ TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
+ String taskName, Integer groupId,
+ Integer processId, Integer priority, TaskGroupQueueStatus status);
+
+ int updateTaskGroupQueueStatus(Integer taskId, int status);
+
+ int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue);
+
+ TaskGroupQueue loadTaskGroupQueue(int taskId);
+
+ void sendStartTask2Master(ProcessInstance processInstance, int taskId,
+ org.apache.dolphinscheduler.remote.command.CommandType taskType);
+
+ ProcessInstance loadNextProcess4Serial(long code, int state);
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
similarity index 97%
copy from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
copy to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 2255c0811b..e6d03a8aaa 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,19 +17,13 @@
package org.apache.dolphinscheduler.service.process;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-
-import static java.util.stream.Collectors.toSet;
-
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -130,10 +124,11 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
@@ -148,22 +143,23 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
/**
* process relative dao that some mappers in this.
*/
@Component
-public class ProcessService {
+public class ProcessServiceImpl implements ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -271,11 +267,12 @@ public class ProcessService {
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
- * @param logger logger
- * @param host host
+ * @param logger logger
+ * @param host host
* @param command found command
* @return process instance
*/
+ @Override
@Transactional
public ProcessInstance handleCommand(Logger logger, String host, Command command) {
ProcessInstance processInstance = constructProcessInstance(command, host);
@@ -364,6 +361,7 @@ public class ProcessService {
* @param command command
* @param message message
*/
+ @Override
public void moveToErrorCommand(Command command, String message) {
ErrorCommand errorCommand = new ErrorCommand(command, message);
this.errorCommandMapper.insert(errorCommand);
@@ -373,7 +371,7 @@ public class ProcessService {
/**
* set process waiting thread
*
- * @param command command
+ * @param command command
* @param processInstance processInstance
* @return process instance
*/
@@ -394,6 +392,7 @@ public class ProcessService {
* @param command command
* @return create result
*/
+ @Override
public int createCommand(Command command) {
int result = 0;
if (command != null) {
@@ -405,6 +404,7 @@ public class ProcessService {
/**
* get command page
*/
+ @Override
public List<Command> findCommandPage(int pageSize, int pageNumber) {
return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
}
@@ -412,6 +412,7 @@ public class ProcessService {
/**
* get command page
*/
+ @Override
public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
if (masterCount <= 0) {
return Lists.newArrayList();
@@ -425,6 +426,7 @@ public class ProcessService {
* @param command command
* @return create command result
*/
+ @Override
public boolean verifyIsNeedCreateCommand(Command command) {
boolean isNeedCreate = true;
EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
@@ -458,6 +460,7 @@ public class ProcessService {
* @param processId processId
* @return process instance
*/
+ @Override
public ProcessInstance findProcessInstanceDetailById(int processId) {
return processInstanceMapper.queryDetailById(processId);
}
@@ -465,6 +468,7 @@ public class ProcessService {
/**
* get task node list by definitionId
*/
+ @Override
public List<TaskDefinition> getTaskNodeListByDefinition(long defineCode) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode);
if (processDefinition == null) {
@@ -491,6 +495,7 @@ public class ProcessService {
* @param processId processId
* @return process instance
*/
+ @Override
public ProcessInstance findProcessInstanceById(int processId) {
return processInstanceMapper.selectById(processId);
}
@@ -501,6 +506,7 @@ public class ProcessService {
* @param processDefinitionId processDefinitionId
* @return process definition
*/
+ @Override
public ProcessDefinition findProcessDefineById(int processDefinitionId) {
return processDefineMapper.selectById(processDefinitionId);
}
@@ -511,6 +517,7 @@ public class ProcessService {
* @param processDefinitionCode processDefinitionCode
* @return process definition
*/
+ @Override
public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
if (processDefinition == null || processDefinition.getVersion() != version) {
@@ -528,6 +535,7 @@ public class ProcessService {
* @param processDefinitionCode processDefinitionCode
* @return process definition
*/
+ @Override
public ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode) {
return processDefineMapper.queryByCode(processDefinitionCode);
}
@@ -538,6 +546,7 @@ public class ProcessService {
* @param processInstanceId processInstanceId
* @return delete process instance result
*/
+ @Override
public int deleteWorkProcessInstanceById(int processInstanceId) {
return processInstanceMapper.deleteById(processInstanceId);
}
@@ -548,6 +557,7 @@ public class ProcessService {
* @param processInstanceId processInstanceId
* @return delete all sub process instance result
*/
+ @Override
public int deleteAllSubWorkProcessByParentId(int processInstanceId) {
List<Integer> subProcessIdList = processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
@@ -566,6 +576,7 @@ public class ProcessService {
*
* @param processInstanceId processInstanceId
*/
+ @Override
public void removeTaskLogFile(Integer processInstanceId) {
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) {
@@ -587,6 +598,7 @@ public class ProcessService {
/**
* recursive delete all task instance by process instance id
*/
+ @Override
public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) {
@@ -606,8 +618,9 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentCode parentCode
- * @param ids ids
+ * @param ids ids
*/
+ @Override
public void recurseFindSubProcess(long parentCode, List<Long> ids) {
List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
@@ -631,9 +644,10 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
- * @param originCommand originCommand
+ * @param originCommand originCommand
* @param processInstance processInstance
*/
+ @Override
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
// sub process doesnot need to create wait command
@@ -687,7 +701,7 @@ public class ProcessService {
/**
* get schedule time from command
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return date
*/
@@ -716,8 +730,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
- * @param command command
- * @param cmdParam cmdParam map
+ * @param command command
+ * @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
@@ -802,9 +816,10 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
- * @param userId userId
+ * @param userId userId
* @return tenant
*/
+ @Override
public Tenant getTenantForProcess(int tenantId, int userId) {
Tenant tenant = null;
if (tenantId >= 0) {
@@ -829,6 +844,7 @@ public class ProcessService {
* @param environmentCode environmentCode
* @return Environment
*/
+ @Override
public Environment findEnvironmentByCode(Long environmentCode) {
Environment environment = null;
if (environmentCode >= 0) {
@@ -840,7 +856,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
@@ -860,7 +876,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
- * @param host host
+ * @param host host
* @return process instance
*/
protected ProcessInstance constructProcessInstance(Command command, String host) {
@@ -1039,7 +1055,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
- * @param command command
+ * @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
@@ -1054,8 +1070,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
- * @param processInstance processInstance
- * @param cmdParam cmdParam
+ * @param processInstance processInstance
+ * @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
@@ -1086,6 +1102,7 @@ public class ProcessService {
*
* @param subProcessInstance subProcessInstance
*/
+ @Override
public void setSubProcessParam(ProcessInstance subProcessInstance) {
String cmdParam = subProcessInstance.getCommandParam();
if (StringUtils.isEmpty(cmdParam)) {
@@ -1128,7 +1145,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
- * @param subGlobalParams subGlobalParams
+ * @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
@@ -1170,6 +1187,7 @@ public class ProcessService {
/**
* retry submit task to db
*/
+ @Override
public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
int retryTimes = 1;
TaskInstance task = null;
@@ -1195,9 +1213,10 @@ public class ProcessService {
* submit sub process to command
*
* @param processInstance processInstance
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @return task instance
*/
+ @Override
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("start submit task : {}, instance id:{}, state: {}",
@@ -1226,7 +1245,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
@@ -1255,7 +1274,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
@@ -1281,8 +1300,9 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
- * @param task task
+ * @param task task
*/
+ @Override
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
if (!task.isSubProcess()) {
return;
@@ -1327,6 +1347,7 @@ public class ProcessService {
return processMapStr;
}
+ @Override
public Map<String, String> getGlobalParamMap(String globalParams) {
List<Property> propList;
Map<String, String> globalParamMap = new HashMap<>();
@@ -1341,6 +1362,7 @@ public class ProcessService {
/**
* create sub work process command
*/
+ @Override
public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
ProcessInstanceMap instanceMap,
@@ -1415,7 +1437,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
- * @param childDefinitionCode childDefinitionId
+ * @param childDefinitionCode childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
@@ -1430,10 +1452,11 @@ public class ProcessService {
/**
* submit task to mysql
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
+ @Override
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.typeIsFinished()
@@ -1466,10 +1489,11 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstance processInstance
* @return process instance state
*/
+ @Override
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus state = taskInstance.getState();
// running, delayed or killed
@@ -1523,6 +1547,7 @@ public class ProcessService {
*
* @param processInstance processInstance
*/
+ @Override
public void saveProcessInstance(ProcessInstance processInstance) {
if (processInstance == null) {
logger.error("save error, process instance is null!");
@@ -1541,6 +1566,7 @@ public class ProcessService {
* @param command command
* @return save command result
*/
+ @Override
public int saveCommand(Command command) {
if (command.getId() != 0) {
return commandMapper.updateById(command);
@@ -1555,6 +1581,7 @@ public class ProcessService {
* @param taskInstance taskInstance
* @return save task instance result
*/
+ @Override
public boolean saveTaskInstance(TaskInstance taskInstance) {
if (taskInstance.getId() != 0) {
return updateTaskInstance(taskInstance);
@@ -1569,6 +1596,7 @@ public class ProcessService {
* @param taskInstance taskInstance
* @return create task instance result
*/
+ @Override
public boolean createTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.insert(taskInstance);
return count > 0;
@@ -1580,6 +1608,7 @@ public class ProcessService {
* @param taskInstance taskInstance
* @return update task instance result
*/
+ @Override
public boolean updateTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.updateById(taskInstance);
return count > 0;
@@ -1591,6 +1620,7 @@ public class ProcessService {
* @param taskId task id
* @return task instance
*/
+ @Override
public TaskInstance findTaskInstanceById(Integer taskId) {
return taskInstanceMapper.selectById(taskId);
}
@@ -1601,6 +1631,7 @@ public class ProcessService {
* @param idList task id list
* @return task instance list
*/
+ @Override
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {
return new ArrayList<>();
@@ -1611,6 +1642,7 @@ public class ProcessService {
/**
* package task instance
*/
+ @Override
public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processInstance.getProcessDefinition());
@@ -1626,6 +1658,7 @@ public class ProcessService {
*
* @param taskDefinition the given {@link TaskDefinition}
*/
+ @Override
public void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
taskDefinition.getTaskParams(),
@@ -1693,9 +1726,10 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
- * @param state state
+ * @param state state
* @return task instance states
*/
+ @Override
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
}
@@ -1706,6 +1740,7 @@ public class ProcessService {
* @param processInstanceId processInstanceId
* @return task instance list
*/
+ @Override
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
}
@@ -1716,6 +1751,7 @@ public class ProcessService {
* @param processInstanceId processInstanceId
* @return task instance list
*/
+ @Override
public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO);
}
@@ -1726,6 +1762,7 @@ public class ProcessService {
* @param processInstanceMap processInstanceMap
* @return update process instance result
*/
+ @Override
public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
return processInstanceMapMapper.updateById(processInstanceMap);
}
@@ -1736,6 +1773,7 @@ public class ProcessService {
* @param processInstanceMap processInstanceMap
* @return create process instance result
*/
+ @Override
public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
int count = 0;
if (processInstanceMap != null) {
@@ -1748,9 +1786,10 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance map
*/
+ @Override
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
}
@@ -1761,6 +1800,7 @@ public class ProcessService {
* @param parentWorkProcessId parentWorkProcessId
* @return delete process map result
*/
+ @Override
public int deleteWorkProcessMapByParentId(int parentWorkProcessId) {
return processInstanceMapMapper.deleteByParentProcessId(parentWorkProcessId);
@@ -1770,9 +1810,10 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance
*/
+ @Override
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
ProcessInstance processInstance = null;
ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryByParentId(parentProcessId, parentTaskId);
@@ -1789,6 +1830,7 @@ public class ProcessService {
* @param subProcessId subProcessId
* @return process instance
*/
+ @Override
public ProcessInstance findParentProcessInstance(Integer subProcessId) {
ProcessInstance processInstance = null;
ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(subProcessId);
@@ -1805,6 +1847,7 @@ public class ProcessService {
* @param processInstance processInstance
* @return update process instance result
*/
+ @Override
public int updateProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.updateById(processInstance);
}
@@ -1812,6 +1855,7 @@ public class ProcessService {
/**
* for show in page of taskInstance
*/
+ @Override
public void changeOutParam(TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getVarPool())) {
return;
@@ -1850,6 +1894,7 @@ public class ProcessService {
* @param intList intList
* @return string list
*/
+ @Override
public List<String> convertIntListToString(List<Integer> intList) {
if (intList == null) {
return new ArrayList<>();
@@ -1867,6 +1912,7 @@ public class ProcessService {
* @param id id
* @return schedule
*/
+ @Override
public Schedule querySchedule(int id) {
return scheduleMapper.selectById(id);
}
@@ -1877,6 +1923,7 @@ public class ProcessService {
* @param processDefinitionCode processDefinitionCode
* @see Schedule
*/
+ @Override
public List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode) {
return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
}
@@ -1887,6 +1934,7 @@ public class ProcessService {
* @param processDefinitionCodeList processDefinitionCodeList
* @see Schedule
*/
+ @Override
public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList = scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
@@ -1899,6 +1947,7 @@ public class ProcessService {
* @param processDefinitionCode processDefinitionCode
* @see DependentProcessDefinition
*/
+ @Override
public List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode) {
return workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
}
@@ -1909,10 +1958,12 @@ public class ProcessService {
* @param host host
* @return process instance list
*/
+ @Override
public List<ProcessInstance> queryNeedFailoverProcessInstances(String host) {
return processInstanceMapper.queryByHostAndStatus(host, stateArray);
}
+ @Override
public List<String> queryNeedFailoverProcessInstanceHost() {
return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
}
@@ -1922,6 +1973,7 @@ public class ProcessService {
*
* @param processInstance processInstance
*/
+ @Override
@Transactional(rollbackFor = RuntimeException.class)
public void processNeedFailoverProcessInstances(ProcessInstance processInstance) {
//1 update processInstance host is null
@@ -1947,6 +1999,7 @@ public class ProcessService {
* @param host host
* @return task instance list
*/
+ @Override
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
stateArray);
@@ -1958,6 +2011,7 @@ public class ProcessService {
* @param id id
* @return datasource
*/
+ @Override
public DataSource findDataSourceById(int id) {
return dataSourceMapper.selectById(id);
}
@@ -1966,9 +2020,10 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
- * @param executionStatus executionStatus
+ * @param executionStatus executionStatus
* @return update process result
*/
+ @Override
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
ProcessInstance instance = processInstanceMapper.selectById(processInstanceId);
instance.setState(executionStatus);
@@ -1981,6 +2036,7 @@ public class ProcessService {
* @param taskId taskId
* @return process instance
*/
+ @Override
public ProcessInstance findProcessInstanceByTaskId(int taskId) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskId);
if (taskInstance != null) {
@@ -1995,6 +2051,7 @@ public class ProcessService {
* @param ids ids
* @return udf function list
*/
+ @Override
public List<UdfFunc> queryUdfFunListByIds(Integer[] ids) {
return udfFuncMapper.queryUdfByIdStr(ids, null);
}
@@ -2002,10 +2059,11 @@ public class ProcessService {
/**
* find tenant code by resource name
*
- * @param resName resource name
+ * @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
+ @Override
public String queryTenantCodeByResName(String resName, ResourceType resourceType) {
// in order to query tenant code successful although the version is older
String fullName = resName.startsWith("/") ? resName : String.format("/%s", resName);
@@ -2032,6 +2090,7 @@ public class ProcessService {
* @param codes codes
* @return schedule list
*/
+ @Override
public List<Schedule> selectAllByProcessDefineCode(long[] codes) {
return scheduleMapper.selectAllByProcessDefineArray(codes);
}
@@ -2040,9 +2099,10 @@ public class ProcessService {
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
- * @param dateInterval dateInterval
+ * @param dateInterval dateInterval
* @return process instance
*/
+ @Override
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
@@ -2053,9 +2113,10 @@ public class ProcessService {
* find last manual process instance interval
*
* @param definitionCode process definition code
- * @param dateInterval dateInterval
+ * @param dateInterval dateInterval
* @return process instance
*/
+ @Override
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
@@ -2066,10 +2127,11 @@ public class ProcessService {
* find last running process instance
*
* @param definitionCode process definition code
- * @param startTime start time
- * @param endTime end time
+ * @param startTime start time
+ * @param endTime end time
* @return process instance
*/
+ @Override
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
@@ -2083,6 +2145,7 @@ public class ProcessService {
* @param processInstance processInstance
* @return queue
*/
+ @Override
public String queryUserQueueByProcessInstance(ProcessInstance processInstance) {
String queue = "";
@@ -2102,6 +2165,7 @@ public class ProcessService {
* @param processInstanceId processInstanceId
* @return projectName and userName
*/
+ @Override
public ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId) {
return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId);
}
@@ -2112,6 +2176,7 @@ public class ProcessService {
* @param taskInstance taskInstance
* @return workerGroupId
*/
+ @Override
public String getTaskWorkerGroup(TaskInstance taskInstance) {
String workerGroup = taskInstance.getWorkerGroup();
@@ -2134,6 +2199,7 @@ public class ProcessService {
* @param userId userId
* @return project list
*/
+ @Override
public List<Project> getProjectListHavePerm(int userId) {
List<Project> createProjects = projectMapper.queryProjectCreatedByUser(userId);
List<Project> authedProjects = projectMapper.queryAuthedProjectListByUserId(userId);
@@ -2151,10 +2217,11 @@ public class ProcessService {
/**
* list unauthorized udf function
*
- * @param userId user id
+ * @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
+ @Override
public <T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType) {
List<T> resultList = new ArrayList<>();
@@ -2199,6 +2266,7 @@ public class ProcessService {
* @param userId user id
* @return User
*/
+ @Override
public User getUserById(int userId) {
return userMapper.selectById(userId);
}
@@ -2209,6 +2277,7 @@ public class ProcessService {
* @param resourceId resource id
* @return Resource
*/
+ @Override
public Resource getResourceById(int resourceId) {
return resourceMapper.selectById(resourceId);
}
@@ -2219,6 +2288,7 @@ public class ProcessService {
* @param resIds resIds
* @return resource list
*/
+ @Override
public List<Resource> listResourceByIds(Integer[] resIds) {
return resourceMapper.listResourceByIds(resIds);
}
@@ -2226,6 +2296,7 @@ public class ProcessService {
/**
* format task app id in task instance
*/
+ @Override
public String formatTaskAppId(TaskInstance taskInstance) {
ProcessInstance processInstance = findProcessInstanceById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
@@ -2241,6 +2312,7 @@ public class ProcessService {
/**
* switch process definition version to process definition log version
*/
+ @Override
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
if (null == processDefinition || null == processDefinitionLog) {
return Constants.DEFINITION_FAILURE;
@@ -2259,6 +2331,7 @@ public class ProcessService {
return result;
}
+ @Override
public int switchProcessTaskRelationVersion(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
@@ -2280,6 +2353,7 @@ public class ProcessService {
}
}
+ @Override
public int switchTaskDefinitionVersion(long taskCode, int taskVersion) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
@@ -2303,6 +2377,7 @@ public class ProcessService {
* @param taskDefinition taskDefinition
* @return resource ids
*/
+ @Override
public String getResourceIds(TaskDefinition taskDefinition) {
Set<Integer> resourceIds = null;
AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder().taskType(taskDefinition.getTaskType()).taskParams(taskDefinition.getTaskParams()).build());
@@ -2320,6 +2395,7 @@ public class ProcessService {
return StringUtils.join(resourceIds, ",");
}
+ @Override
public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine) {
Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
@@ -2391,6 +2467,7 @@ public class ProcessService {
/**
* save processDefinition (including create or update processDefinition)
*/
+ @Override
public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
@@ -2415,6 +2492,7 @@ public class ProcessService {
/**
* save task relations
*/
+ @Override
public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs,
Boolean syncDefine) {
@@ -2464,6 +2542,7 @@ public class ProcessService {
return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
+ @Override
public boolean isTaskOnline(long taskCode) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
@@ -2489,6 +2568,7 @@ public class ProcessService {
* @param processDefinition process definition
* @return dag graph
*/
+ @Override
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskNode> taskNodeList = transformTask(taskRelations, Lists.newArrayList());
@@ -2500,6 +2580,7 @@ public class ProcessService {
/**
* generate DagData
*/
+ @Override
public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(taskRelations);
@@ -2507,6 +2588,7 @@ public class ProcessService {
return new DagData(processDefinition, taskRelations, taskDefinitions);
}
+ @Override
public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
@@ -2523,6 +2605,7 @@ public class ProcessService {
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
+ @Override
public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
@@ -2543,6 +2626,7 @@ public class ProcessService {
/**
* find task definition by code and version
*/
+ @Override
public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
}
@@ -2550,6 +2634,7 @@ public class ProcessService {
/**
* find process task relation list by process
*/
+ @Override
public List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion) {
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinitionCode, processDefinitionVersion);
return processTaskRelationLogList.stream().map(r -> (ProcessTaskRelation) r).collect(Collectors.toList());
@@ -2559,7 +2644,7 @@ public class ProcessService {
* add authorized resources
*
* @param ownResources own resources
- * @param userId userId
+ * @param userId userId
*/
private void addAuthorizedResources(List<Resource> ownResources, int userId) {
List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);
@@ -2570,6 +2655,7 @@ public class ProcessService {
/**
* Use temporarily before refactoring taskNode
*/
+ @Override
public List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
Map<Long, List<Long>> taskCodeMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : taskRelationList) {
@@ -2624,6 +2710,7 @@ public class ProcessService {
return taskNodeList;
}
+ @Override
public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId) {
HashMap<ProcessInstance, TaskInstance> processTaskMap = new HashMap<>();
//find sub tasks
@@ -2640,10 +2727,12 @@ public class ProcessService {
return processTaskMap;
}
+ @Override
public DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId) {
return dqExecuteResultMapper.getExecuteResultById(taskInstanceId);
}
+ @Override
public int updateDqExecuteResultUserId(int taskInstanceId) {
DqExecuteResult dqExecuteResult =
dqExecuteResultMapper.selectOne(new QueryWrapper<DqExecuteResult>().eq(TASK_INSTANCE_ID, taskInstanceId));
@@ -2667,34 +2756,41 @@ public class ProcessService {
return dqExecuteResultMapper.updateById(dqExecuteResult);
}
+ @Override
public int updateDqExecuteResultState(DqExecuteResult dqExecuteResult) {
return dqExecuteResultMapper.updateById(dqExecuteResult);
}
+ @Override
public int deleteDqExecuteResultByTaskInstanceId(int taskInstanceId) {
return dqExecuteResultMapper.delete(
new QueryWrapper<DqExecuteResult>()
.eq(TASK_INSTANCE_ID, taskInstanceId));
}
+ @Override
public int deleteTaskStatisticsValueByTaskInstanceId(int taskInstanceId) {
return dqTaskStatisticsValueMapper.delete(
new QueryWrapper<DqTaskStatisticsValue>()
.eq(TASK_INSTANCE_ID, taskInstanceId));
}
+ @Override
public DqRule getDqRule(int ruleId) {
return dqRuleMapper.selectById(ruleId);
}
+ @Override
public List<DqRuleInputEntry> getRuleInputEntry(int ruleId) {
return DqRuleUtils.transformInputEntry(dqRuleInputEntryMapper.getRuleInputEntryList(ruleId));
}
+ @Override
public List<DqRuleExecuteSql> getDqExecuteSql(int ruleId) {
return dqRuleExecuteSqlMapper.getExecuteSqlList(ruleId);
}
+ @Override
public DqComparisonType getComparisonTypeById(int id) {
return dqComparisonTypeMapper.selectById(id);
}
@@ -2704,6 +2800,7 @@ public class ProcessService {
*
* @param taskId task id
*/
+ @Override
public boolean acquireTaskGroup(int taskId,
String taskName, int groupId,
int processId, int priority) {
@@ -2744,6 +2841,7 @@ public class ProcessService {
/**
* try to get the task group resource(when other task release the resource)
*/
+ @Override
public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(),
@@ -2757,10 +2855,12 @@ public class ProcessService {
return false;
}
+ @Override
public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
return robTaskGroupResouce(taskGroupQueue);
}
+ @Override
public void releaseAllTaskGroup(int processInstanceId) {
List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
for (TaskInstance info : taskInstances) {
@@ -2773,6 +2873,7 @@ public class ProcessService {
*
* @return the result code and msg
*/
+ @Override
public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
@@ -2819,6 +2920,7 @@ public class ProcessService {
* @return the result code and msg
*/
+ @Override
public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) {
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId);
taskGroupQueue.setStatus(status);
@@ -2829,13 +2931,14 @@ public class ProcessService {
/**
* insert into task group queue
*
- * @param taskId task id
- * @param taskName task name
- * @param groupId group id
+ * @param taskId task id
+ * @param taskName task name
+ * @param groupId group id
* @param processId process id
- * @param priority priority
+ * @param priority priority
* @return result and msg code
*/
+ @Override
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
String taskName, Integer groupId,
Integer processId, Integer priority, TaskGroupQueueStatus status) {
@@ -2846,18 +2949,22 @@ public class ProcessService {
return taskGroupQueue;
}
+ @Override
public int updateTaskGroupQueueStatus(Integer taskId, int status) {
return taskGroupQueueMapper.updateStatusByTaskId(taskId, status);
}
+ @Override
public int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue) {
return taskGroupQueueMapper.updateById(taskGroupQueue);
}
+ @Override
public TaskGroupQueue loadTaskGroupQueue(int taskId) {
return this.taskGroupQueueMapper.queryByTaskId(taskId);
}
+ @Override
public void sendStartTask2Master(ProcessInstance processInstance, int taskId,
org.apache.dolphinscheduler.remote.command.CommandType taskType) {
String host = processInstance.getHost();
@@ -2869,6 +2976,7 @@ public class ProcessService {
stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType));
}
+ @Override
public ProcessInstance loadNextProcess4Serial(long code, int state) {
return this.processInstanceMapper.loadNextProcess4Serial(code, state);
}
@@ -2879,4 +2987,4 @@ public class ProcessService {
throw new ServiceException("delete command fail, id:" + commandId);
}
}
-}
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index c57e6ba5c5..e8abd0692e 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -17,12 +17,7 @@
package org.apache.dolphinscheduler.service.process;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-
-import static org.mockito.ArgumentMatchers.any;
-
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
@@ -78,14 +73,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
import org.apache.dolphinscheduler.spi.params.base.FormType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -99,7 +86,17 @@ import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.mockito.ArgumentMatchers.any;
/**
* process service test
@@ -113,7 +110,7 @@ public class ProcessServiceTest {
public final ExpectedException exception = ExpectedException.none();
@InjectMocks
- private ProcessService processService;
+ private ProcessServiceImpl processService;
@Mock
private CommandMapper commandMapper;
@Mock
@@ -850,8 +847,8 @@ public class ProcessServiceTest {
int pageNumber = 0;
int masterCount = 0;
int thisMasterSlot = 2;
- List<Command> commandList = processService.findCommandPageBySlot(pageSize,pageNumber,masterCount,thisMasterSlot);
- Assert.assertEquals(0,commandList.size());
+ List<Command> commandList = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+ Assert.assertEquals(0, commandList.size());
}
private TaskGroupQueue getTaskGroupQueue() {