You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/08/04 01:42:52 UTC
[dolphinscheduler] branch json_split_two updated: check has cycle
of ProcessDefinition (#5944)
This is an automated email from the ASF dual-hosted git repository.
wenhemin pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split_two by this push:
new 316b919 check has cycle of ProcessDefinition (#5944)
316b919 is described below
commit 316b919d4f6b100a4129c5f975b7bf24559bde5a
Author: JinyLeeChina <42...@users.noreply.github.com>
AuthorDate: Wed Aug 4 09:42:44 2021 +0800
check has cycle of ProcessDefinition (#5944)
Co-authored-by: JinyLeeChina <29...@qq.com>
---
.../api/service/ProcessDefinitionService.java | 9 -
.../service/impl/ProcessDefinitionServiceImpl.java | 24 +-
.../service/process/ProcessService.java | 255 ++++++++++++---------
3 files changed, 158 insertions(+), 130 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 5fcf581..a49a08f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -324,14 +324,5 @@ public interface ProcessDefinitionService {
long projectCode,
int processDefinitionId,
long version);
-
- /**
- * check has associated process definition
- *
- * @param processDefinitionId process definition id
- * @param version version
- * @return The query result has a specific process definition return true
- */
- boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 65e2b10..f0fcec3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -255,12 +255,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
- // TODO check has cycle
- // if (graphHasCycle(taskRelationList)) {
- // logger.error("process DAG has cycle");
- // putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
- // return result;
- // }
+ if (graphHasCycle(processService.transformTask(taskRelationList))) {
+ logger.error("process DAG has cycle");
+ putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
+ return result;
+ }
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
@@ -1324,19 +1323,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
/**
- * check has associated process definition
- *
- * @param processDefinitionId process definition id
- * @param version version
- * @return The query result has a specific process definition return true
- */
- @Override
- public boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version) {
- Integer hasAssociatedDefinitionId = processDefinitionMapper.queryHasAssociatedDefinitionByIdAndVersion(processDefinitionId, version);
- return Objects.nonNull(hasAssociatedDefinitionId);
- }
-
- /**
* query the pagination versions info by one certain process definition code
*
* @param loginUser login user info to check auth
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bd5e4df..ac91b06 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -139,10 +139,10 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal()};
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
+ ExecutionStatus.READY_PAUSE.ordinal(),
+ ExecutionStatus.READY_STOP.ordinal()};
@Autowired
private UserMapper userMapper;
@@ -526,17 +526,17 @@ public class ProcessService {
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
- CommandType.RECOVER_WAITING_THREAD,
- processInstance.getTaskDependType(),
- processInstance.getFailureStrategy(),
- processInstance.getExecutorId(),
- processInstance.getProcessDefinition().getCode(),
- JSONUtils.toJsonString(cmdParam),
- processInstance.getWarningType(),
- processInstance.getWarningGroupId(),
- processInstance.getScheduleTime(),
- processInstance.getWorkerGroup(),
- processInstance.getProcessInstancePriority()
+ CommandType.RECOVER_WAITING_THREAD,
+ processInstance.getTaskDependType(),
+ processInstance.getFailureStrategy(),
+ processInstance.getExecutorId(),
+ processInstance.getProcessDefinition().getCode(),
+ JSONUtils.toJsonString(cmdParam),
+ processInstance.getWarningType(),
+ processInstance.getWarningGroupId(),
+ processInstance.getScheduleTime(),
+ processInstance.getWorkerGroup(),
+ processInstance.getProcessInstancePriority()
);
saveCommand(command);
return;
@@ -617,10 +617,10 @@ public class ProcessService {
// curing global params
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ getCommandTypeIfComplement(processInstance, command),
+ processInstance.getScheduleTime()));
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@@ -646,7 +646,7 @@ public class ProcessService {
startParamMap.putAll(fatherParamMap);
// set start param into global params
if (startParamMap.size() > 0
- && processDefinition.getGlobalParamMap() != null) {
+ && processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
@@ -693,8 +693,8 @@ public class ProcessService {
private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) {
if (cmdParam == null
- || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
- || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
+ || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
+ || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
return false;
}
@@ -753,10 +753,10 @@ public class ProcessService {
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- commandTypeIfComplement,
- processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ commandTypeIfComplement,
+ processInstance.getScheduleTime()));
processInstance.setProcessDefinition(processDefinition);
}
//reset command parameter
@@ -804,7 +804,7 @@ public class ProcessService {
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
- String.join(Constants.COMMA, convertIntListToString(failedList)));
+ String.join(Constants.COMMA, convertIntListToString(failedList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@@ -817,7 +817,7 @@ public class ProcessService {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
- ExecutionStatus.KILL);
+ ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for (Integer taskId : suspendedNodeList) {
// initialize the pause state
@@ -872,8 +872,6 @@ public class ProcessService {
* If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance
* Otherwise, get the latest version of ProcessDefinition
*
- * @param processDefinitionCode
- * @param cmdParam
* @return ProcessDefinition
*/
private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
@@ -894,7 +892,7 @@ public class ProcessService {
}
return processDefineLogMapper.queryByDefinitionCodeAndVersion(
- processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
}
}
@@ -931,14 +929,14 @@ public class ProcessService {
}
Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
- YYYY_MM_DD_HH_MM_SS);
+ YYYY_MM_DD_HH_MM_SS);
if (Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(startComplementTime);
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
}
@@ -958,7 +956,7 @@ public class ProcessService {
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
// write sub process id into cmd param.
if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
- && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+ && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
paramMap.remove(CMD_PARAM_SUB_PROCESS);
paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@@ -971,7 +969,7 @@ public class ProcessService {
ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
if (parentInstance != null) {
subProcessInstance.setGlobalParams(
- joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
+ joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
this.saveProcessInstance(subProcessInstance);
} else {
logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
@@ -1019,7 +1017,7 @@ public class ProcessService {
private void initTaskInstance(TaskInstance taskInstance) {
if (!taskInstance.isSubProcess()
- && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
+ && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
return;
@@ -1039,12 +1037,12 @@ public class ProcessService {
public TaskInstance submitTask(TaskInstance taskInstance) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
logger.info("start submit task : {}, instance id:{}, state: {}",
- taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
- taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
return task;
}
if (!task.getState().typeIsFinished()) {
@@ -1052,7 +1050,7 @@ public class ProcessService {
}
logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ",
- taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
+ taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
return task;
}
@@ -1110,7 +1108,7 @@ public class ProcessService {
}
}
logger.info("sub process instance is not found,parent task:{},parent instance:{}",
- parentTask.getId(), parentProcessInstance.getId());
+ parentTask.getId(), parentProcessInstance.getId());
return null;
}
@@ -1199,17 +1197,17 @@ public class ProcessService {
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
return new Command(
- commandType,
- TaskDependType.TASK_POST,
- parentProcessInstance.getFailureStrategy(),
- parentProcessInstance.getExecutorId(),
- processDefinition.getCode(),
- processParam,
- parentProcessInstance.getWarningType(),
- parentProcessInstance.getWarningGroupId(),
- parentProcessInstance.getScheduleTime(),
- task.getWorkerGroup(),
- parentProcessInstance.getProcessInstancePriority()
+ commandType,
+ TaskDependType.TASK_POST,
+ parentProcessInstance.getFailureStrategy(),
+ parentProcessInstance.getExecutorId(),
+ processDefinition.getCode(),
+ processParam,
+ parentProcessInstance.getWarningType(),
+ parentProcessInstance.getWarningGroupId(),
+ parentProcessInstance.getScheduleTime(),
+ task.getWorkerGroup(),
+ parentProcessInstance.getProcessInstancePriority()
);
}
@@ -1246,7 +1244,7 @@ public class ProcessService {
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
- parentProcessInstance.getProcessDefinitionVersion());
+ parentProcessInstance.getProcessDefinitionVersion());
ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
@@ -1269,7 +1267,7 @@ public class ProcessService {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} else {
if (processInstanceState != ExecutionStatus.READY_STOP
- && processInstanceState != ExecutionStatus.READY_PAUSE) {
+ && processInstanceState != ExecutionStatus.READY_PAUSE) {
// failure task set invalid
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
@@ -1320,9 +1318,9 @@ public class ProcessService {
// the task already exists in task queue
// return state
if (
- state == ExecutionStatus.RUNNING_EXECUTION
- || state == ExecutionStatus.DELAY_EXECUTION
- || state == ExecutionStatus.KILL
+ state == ExecutionStatus.RUNNING_EXECUTION
+ || state == ExecutionStatus.DELAY_EXECUTION
+ || state == ExecutionStatus.KILL
) {
return state;
}
@@ -1331,7 +1329,7 @@ public class ProcessService {
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
state = ExecutionStatus.PAUSE;
} else if (processInstanceState == ExecutionStatus.READY_STOP
- || !checkProcessStrategy(taskInstance)) {
+ || !checkProcessStrategy(taskInstance)) {
state = ExecutionStatus.KILL;
} else {
state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1355,7 +1353,7 @@ public class ProcessService {
for (TaskInstance task : taskInstances) {
if (task.getState() == ExecutionStatus.FAILURE
- && task.getRetryTimes() >= task.getMaxRetryTimes()) {
+ && task.getRetryTimes() >= task.getMaxRetryTimes()) {
return false;
}
}
@@ -1455,12 +1453,12 @@ public class ProcessService {
ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionVersion());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion());
+ taskInstance.getTaskCode(),
+ taskInstance.getTaskDefinitionVersion());
taskInstance.setTaskDefine(taskDefinition);
return taskInstance;
}
@@ -1632,7 +1630,6 @@ public class ProcessService {
/**
* for show in page of taskInstance
- * @param taskInstance
*/
public void changeOutParam(TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getVarPool())) {
@@ -1742,7 +1739,7 @@ public class ProcessService {
*/
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
- stateArray);
+ stateArray);
}
/**
@@ -1838,8 +1835,8 @@ public class ProcessService {
*/
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
@@ -1851,8 +1848,8 @@ public class ProcessService {
*/
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
@@ -1865,9 +1862,9 @@ public class ProcessService {
*/
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
- startTime,
- endTime,
- stateArray);
+ startTime,
+ endTime,
+ stateArray);
}
/**
@@ -2125,10 +2122,10 @@ public class ProcessService {
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().
- stream()
- .filter(t -> t.getId() != 0)
- .map(ResourceInfo::getId)
- .collect(Collectors.toSet());
+ stream()
+ .filter(t -> t.getId() != 0)
+ .map(ResourceInfo::getId)
+ .collect(Collectors.toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
return StringUtils.EMPTY;
@@ -2189,7 +2186,7 @@ public class ProcessService {
public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
ProcessData processData, ProcessDefinition processDefinition, Boolean isFromProcessDefine) {
ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
- name, processData, project, desc, locations);
+ name, processData, project, desc, locations);
Map<String, TaskDefinition> taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks(), isFromProcessDefine);
if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) {
return Constants.DEFINITION_FAILURE;
@@ -2289,33 +2286,33 @@ public class ProcessService {
if (CollectionUtils.isNotEmpty(depList)) {
for (String preTaskName : depList) {
builderRelationList.add(new ProcessTaskRelation(
- StringUtils.EMPTY,
- processDefinition.getVersion(),
- projectCode,
- processDefinition.getCode(),
- taskDefinitionMap.get(preTaskName).getCode(),
- taskDefinitionMap.get(preTaskName).getVersion(),
- taskDefinitionMap.get(taskNode.getName()).getCode(),
- taskDefinitionMap.get(taskNode.getName()).getVersion(),
- ConditionType.NONE,
- StringUtils.EMPTY,
- now,
- now));
- }
- } else {
- builderRelationList.add(new ProcessTaskRelation(
StringUtils.EMPTY,
processDefinition.getVersion(),
projectCode,
processDefinition.getCode(),
- 0L, // this isn't previous task node, set zero
- 0,
+ taskDefinitionMap.get(preTaskName).getCode(),
+ taskDefinitionMap.get(preTaskName).getVersion(),
taskDefinitionMap.get(taskNode.getName()).getCode(),
taskDefinitionMap.get(taskNode.getName()).getVersion(),
ConditionType.NONE,
StringUtils.EMPTY,
now,
now));
+ }
+ } else {
+ builderRelationList.add(new ProcessTaskRelation(
+ StringUtils.EMPTY,
+ processDefinition.getVersion(),
+ projectCode,
+ processDefinition.getCode(),
+ 0L, // this isn't previous task node, set zero
+ 0,
+ taskDefinitionMap.get(taskNode.getName()).getCode(),
+ taskDefinitionMap.get(taskNode.getName()).getVersion(),
+ ConditionType.NONE,
+ StringUtils.EMPTY,
+ now,
+ now));
}
}
for (ProcessTaskRelation processTaskRelation : builderRelationList) {
@@ -2354,9 +2351,9 @@ public class ProcessService {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList
- .stream()
- .map(ProcessTaskRelation::getProcessDefinitionCode)
- .collect(Collectors.toSet());
+ .stream()
+ .map(ProcessTaskRelation::getProcessDefinitionCode)
+ .collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
// check process definition is already online
for (ProcessDefinition processDefinition : processDefinitionList) {
@@ -2389,6 +2386,10 @@ public class ProcessService {
*/
public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+ return new DagData(processDefinition, processTaskRelations, genTaskDefineList(processTaskRelations));
+ }
+
+ private List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelationLog> processTaskRelations) {
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
@@ -2398,8 +2399,7 @@ public class ProcessService {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
}
}
- List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
- return new DagData(processDefinition, processTaskRelations, taskDefinitionLogs);
+ return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
/**
@@ -2467,8 +2467,8 @@ public class ProcessService {
v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
- taskDefinitionLog.getTimeoutNotifyStrategy(),
- taskDefinitionLog.getTimeout())));
+ taskDefinitionLog.getTimeoutNotifyStrategy(),
+ taskDefinitionLog.getTimeout())));
v.setDelayTime(taskDefinitionLog.getDelayTime());
v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList())));
@@ -2488,7 +2488,7 @@ public class ProcessService {
*/
public List<TaskDefinitionLog> queryTaskDefinitionListByProcess(long processCode, int processVersion) {
List<ProcessTaskRelationLog> processTaskRelationLogs =
- processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
+ processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogs) {
if (processTaskRelationLog.getPreTaskCode() > 0) {
@@ -2532,4 +2532,55 @@ public class ProcessService {
List<Resource> relationResources = CollectionUtils.isNotEmpty(relationResourceIds) ? resourceMapper.queryResourceListById(relationResourceIds) : new ArrayList<>();
ownResources.addAll(relationResources);
}
+
+ /**
+ * Use temporarily before refactoring taskNode
+ */
+ public List<TaskNode> transformTask(List<ProcessTaskRelationLog> taskRelationList) {
+ Map<Long, List<Long>> taskCodeMap = new HashMap<>();
+ for (ProcessTaskRelationLog processTaskRelation : taskRelationList) {
+ taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
+ if (v == null) {
+ v = new ArrayList<>();
+ }
+ if (processTaskRelation.getPreTaskCode() != 0L) {
+ v.add(processTaskRelation.getPreTaskCode());
+ }
+ return v;
+ });
+ }
+ List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelationList);
+ Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
+ .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
+ List<TaskNode> taskNodeList = new ArrayList<>();
+ for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
+ TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey());
+ if (taskDefinitionLog != null) {
+ TaskNode taskNode = new TaskNode();
+ taskNode.setCode(taskDefinitionLog.getCode());
+ taskNode.setVersion(taskDefinitionLog.getVersion());
+ taskNode.setName(taskDefinitionLog.getName());
+ taskNode.setDesc(taskDefinitionLog.getDescription());
+ taskNode.setType(taskDefinitionLog.getTaskType().toUpperCase());
+ taskNode.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+ taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
+ taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
+ Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
+ taskNode.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
+ taskNode.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
+ taskParamsMap.remove(Constants.CONDITION_RESULT);
+ taskParamsMap.remove(Constants.DEPENDENCE);
+ taskNode.setParams(JSONUtils.toJsonString(taskParamsMap));
+ taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
+ taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
+ taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
+ taskDefinitionLog.getTimeoutNotifyStrategy(),
+ taskDefinitionLog.getTimeout())));
+ taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
+ taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getName).collect(Collectors.toList())));
+ taskNodeList.add(taskNode);
+ }
+ }
+ return taskNodeList;
+ }
}