You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/08/04 01:42:52 UTC

[dolphinscheduler] branch json_split_two updated: check has cycle of ProcessDefinition (#5944)

This is an automated email from the ASF dual-hosted git repository.

wenhemin pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split_two by this push:
     new 316b919  check has cycle of ProcessDefinition (#5944)
316b919 is described below

commit 316b919d4f6b100a4129c5f975b7bf24559bde5a
Author: JinyLeeChina <42...@users.noreply.github.com>
AuthorDate: Wed Aug 4 09:42:44 2021 +0800

    check has cycle of ProcessDefinition (#5944)
    
    Co-authored-by: JinyLeeChina <29...@qq.com>
---
 .../api/service/ProcessDefinitionService.java      |   9 -
 .../service/impl/ProcessDefinitionServiceImpl.java |  24 +-
 .../service/process/ProcessService.java            | 255 ++++++++++++---------
 3 files changed, 158 insertions(+), 130 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 5fcf581..a49a08f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -324,14 +324,5 @@ public interface ProcessDefinitionService {
                                                               long projectCode,
                                                               int processDefinitionId,
                                                               long version);
-
-    /**
-     * check has associated process definition
-     *
-     * @param processDefinitionId process definition id
-     * @param version version
-     * @return The query result has a specific process definition return true
-     */
-    boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version);
 }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 65e2b10..f0fcec3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -255,12 +255,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 return result;
             }
 
-            // TODO check has cycle
-            // if (graphHasCycle(taskRelationList)) {
-            //  logger.error("process DAG has cycle");
-            //   putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
-            //   return result;
-            // }
+            if (graphHasCycle(processService.transformTask(taskRelationList))) {
+                logger.error("process DAG has cycle");
+                putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
+                return result;
+            }
 
             // check whether the task relation json is normal
             for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
@@ -1324,19 +1323,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     }
 
     /**
-     * check has associated process definition
-     *
-     * @param processDefinitionId process definition id
-     * @param version version
-     * @return The query result has a specific process definition return true
-     */
-    @Override
-    public boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version) {
-        Integer hasAssociatedDefinitionId = processDefinitionMapper.queryHasAssociatedDefinitionByIdAndVersion(processDefinitionId, version);
-        return Objects.nonNull(hasAssociatedDefinitionId);
-    }
-
-    /**
      * query the pagination versions info by one certain process definition code
      *
      * @param loginUser login user info to check auth
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bd5e4df..ac91b06 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -139,10 +139,10 @@ public class ProcessService {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
-            ExecutionStatus.DELAY_EXECUTION.ordinal(),
-            ExecutionStatus.READY_PAUSE.ordinal(),
-            ExecutionStatus.READY_STOP.ordinal()};
+        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+        ExecutionStatus.DELAY_EXECUTION.ordinal(),
+        ExecutionStatus.READY_PAUSE.ordinal(),
+        ExecutionStatus.READY_STOP.ordinal()};
 
     @Autowired
     private UserMapper userMapper;
@@ -526,17 +526,17 @@ public class ProcessService {
         // process instance quit by "waiting thread" state
         if (originCommand == null) {
             Command command = new Command(
-                    CommandType.RECOVER_WAITING_THREAD,
-                    processInstance.getTaskDependType(),
-                    processInstance.getFailureStrategy(),
-                    processInstance.getExecutorId(),
-                    processInstance.getProcessDefinition().getCode(),
-                    JSONUtils.toJsonString(cmdParam),
-                    processInstance.getWarningType(),
-                    processInstance.getWarningGroupId(),
-                    processInstance.getScheduleTime(),
-                    processInstance.getWorkerGroup(),
-                    processInstance.getProcessInstancePriority()
+                CommandType.RECOVER_WAITING_THREAD,
+                processInstance.getTaskDependType(),
+                processInstance.getFailureStrategy(),
+                processInstance.getExecutorId(),
+                processInstance.getProcessDefinition().getCode(),
+                JSONUtils.toJsonString(cmdParam),
+                processInstance.getWarningType(),
+                processInstance.getWarningGroupId(),
+                processInstance.getScheduleTime(),
+                processInstance.getWorkerGroup(),
+                processInstance.getProcessInstancePriority()
             );
             saveCommand(command);
             return;
@@ -617,10 +617,10 @@ public class ProcessService {
 
         // curing global params
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                processDefinition.getGlobalParamMap(),
-                processDefinition.getGlobalParamList(),
-                getCommandTypeIfComplement(processInstance, command),
-                processInstance.getScheduleTime()));
+            processDefinition.getGlobalParamMap(),
+            processDefinition.getGlobalParamList(),
+            getCommandTypeIfComplement(processInstance, command),
+            processInstance.getScheduleTime()));
 
         // set process instance priority
         processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@@ -646,7 +646,7 @@ public class ProcessService {
         startParamMap.putAll(fatherParamMap);
         // set start param into global params
         if (startParamMap.size() > 0
-                && processDefinition.getGlobalParamMap() != null) {
+            && processDefinition.getGlobalParamMap() != null) {
             for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
                 String val = startParamMap.get(param.getKey());
                 if (val != null) {
@@ -693,8 +693,8 @@ public class ProcessService {
     private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
         if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) {
             if (cmdParam == null
-                    || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
-                    || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
+                || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
+                || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
                 logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
                 return false;
             }
@@ -753,10 +753,10 @@ public class ProcessService {
 
                 // Recalculate global parameters after rerun.
                 processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                        processDefinition.getGlobalParamMap(),
-                        processDefinition.getGlobalParamList(),
-                        commandTypeIfComplement,
-                        processInstance.getScheduleTime()));
+                    processDefinition.getGlobalParamMap(),
+                    processDefinition.getGlobalParamList(),
+                    commandTypeIfComplement,
+                    processInstance.getScheduleTime()));
                 processInstance.setProcessDefinition(processDefinition);
             }
             //reset command parameter
@@ -804,7 +804,7 @@ public class ProcessService {
                     initTaskInstance(this.findTaskInstanceById(taskId));
                 }
                 cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
-                        String.join(Constants.COMMA, convertIntListToString(failedList)));
+                    String.join(Constants.COMMA, convertIntListToString(failedList)));
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
                 break;
@@ -817,7 +817,7 @@ public class ProcessService {
                 cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
                 List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
                 List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
-                        ExecutionStatus.KILL);
+                    ExecutionStatus.KILL);
                 suspendedNodeList.addAll(stopNodeList);
                 for (Integer taskId : suspendedNodeList) {
                     // initialize the pause state
@@ -872,8 +872,6 @@ public class ProcessService {
      * If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance
      * Otherwise, get the latest version of ProcessDefinition
      *
-     * @param processDefinitionCode
-     * @param cmdParam
      * @return ProcessDefinition
      */
     private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
@@ -894,7 +892,7 @@ public class ProcessService {
                 }
 
                 return processDefineLogMapper.queryByDefinitionCodeAndVersion(
-                        processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
+                    processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
             }
         }
 
@@ -931,14 +929,14 @@ public class ProcessService {
         }
 
         Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
-                YYYY_MM_DD_HH_MM_SS);
+            YYYY_MM_DD_HH_MM_SS);
         if (Flag.NO == processInstance.getIsSubProcess()) {
             processInstance.setScheduleTime(startComplementTime);
         }
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                processDefinition.getGlobalParamMap(),
-                processDefinition.getGlobalParamList(),
-                CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+            processDefinition.getGlobalParamMap(),
+            processDefinition.getGlobalParamList(),
+            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
 
     }
 
@@ -958,7 +956,7 @@ public class ProcessService {
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
         // write sub process id into cmd param.
         if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
-                && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+            && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
             paramMap.remove(CMD_PARAM_SUB_PROCESS);
             paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
             subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@@ -971,7 +969,7 @@ public class ProcessService {
             ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
             if (parentInstance != null) {
                 subProcessInstance.setGlobalParams(
-                        joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
+                    joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
                 this.saveProcessInstance(subProcessInstance);
             } else {
                 logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
@@ -1019,7 +1017,7 @@ public class ProcessService {
     private void initTaskInstance(TaskInstance taskInstance) {
 
         if (!taskInstance.isSubProcess()
-                && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
+            && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
             taskInstance.setFlag(Flag.NO);
             updateTaskInstance(taskInstance);
             return;
@@ -1039,12 +1037,12 @@ public class ProcessService {
     public TaskInstance submitTask(TaskInstance taskInstance) {
         ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
         logger.info("start submit task : {}, instance id:{}, state: {}",
-                taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
+            taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
         //submit to db
         TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
         if (task == null) {
             logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
-                    taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
+                taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
             return task;
         }
         if (!task.getState().typeIsFinished()) {
@@ -1052,7 +1050,7 @@ public class ProcessService {
         }
 
         logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {}  ",
-                taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
+            taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
         return task;
     }
 
@@ -1110,7 +1108,7 @@ public class ProcessService {
             }
         }
         logger.info("sub process instance is not found,parent task:{},parent instance:{}",
-                parentTask.getId(), parentProcessInstance.getId());
+            parentTask.getId(), parentProcessInstance.getId());
         return null;
     }
 
@@ -1199,17 +1197,17 @@ public class ProcessService {
         String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
 
         return new Command(
-                commandType,
-                TaskDependType.TASK_POST,
-                parentProcessInstance.getFailureStrategy(),
-                parentProcessInstance.getExecutorId(),
-                processDefinition.getCode(),
-                processParam,
-                parentProcessInstance.getWarningType(),
-                parentProcessInstance.getWarningGroupId(),
-                parentProcessInstance.getScheduleTime(),
-                task.getWorkerGroup(),
-                parentProcessInstance.getProcessInstancePriority()
+            commandType,
+            TaskDependType.TASK_POST,
+            parentProcessInstance.getFailureStrategy(),
+            parentProcessInstance.getExecutorId(),
+            processDefinition.getCode(),
+            processParam,
+            parentProcessInstance.getWarningType(),
+            parentProcessInstance.getWarningGroupId(),
+            parentProcessInstance.getScheduleTime(),
+            task.getWorkerGroup(),
+            parentProcessInstance.getProcessInstancePriority()
         );
     }
 
@@ -1246,7 +1244,7 @@ public class ProcessService {
      */
     private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
         ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
-                parentProcessInstance.getProcessDefinitionVersion());
+            parentProcessInstance.getProcessDefinitionVersion());
         ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
         if (childDefinition != null && fatherDefinition != null) {
             childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
@@ -1269,7 +1267,7 @@ public class ProcessService {
                 taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
             } else {
                 if (processInstanceState != ExecutionStatus.READY_STOP
-                        && processInstanceState != ExecutionStatus.READY_PAUSE) {
+                    && processInstanceState != ExecutionStatus.READY_PAUSE) {
                     // failure task set invalid
                     taskInstance.setFlag(Flag.NO);
                     updateTaskInstance(taskInstance);
@@ -1320,9 +1318,9 @@ public class ProcessService {
         // the task already exists in task queue
         // return state
         if (
-                state == ExecutionStatus.RUNNING_EXECUTION
-                        || state == ExecutionStatus.DELAY_EXECUTION
-                        || state == ExecutionStatus.KILL
+            state == ExecutionStatus.RUNNING_EXECUTION
+                || state == ExecutionStatus.DELAY_EXECUTION
+                || state == ExecutionStatus.KILL
         ) {
             return state;
         }
@@ -1331,7 +1329,7 @@ public class ProcessService {
         if (processInstanceState == ExecutionStatus.READY_PAUSE) {
             state = ExecutionStatus.PAUSE;
         } else if (processInstanceState == ExecutionStatus.READY_STOP
-                || !checkProcessStrategy(taskInstance)) {
+            || !checkProcessStrategy(taskInstance)) {
             state = ExecutionStatus.KILL;
         } else {
             state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1355,7 +1353,7 @@ public class ProcessService {
 
         for (TaskInstance task : taskInstances) {
             if (task.getState() == ExecutionStatus.FAILURE
-                    && task.getRetryTimes() >= task.getMaxRetryTimes()) {
+                && task.getRetryTimes() >= task.getMaxRetryTimes()) {
                 return false;
             }
         }
@@ -1455,12 +1453,12 @@ public class ProcessService {
         ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
         // get process define
         ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                processInstance.getProcessDefinitionVersion());
+            processInstance.getProcessDefinitionVersion());
         taskInstance.setProcessInstance(processInstance);
         taskInstance.setProcessDefine(processDefine);
         TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
-                taskInstance.getTaskCode(),
-                taskInstance.getTaskDefinitionVersion());
+            taskInstance.getTaskCode(),
+            taskInstance.getTaskDefinitionVersion());
         taskInstance.setTaskDefine(taskDefinition);
         return taskInstance;
     }
@@ -1632,7 +1630,6 @@ public class ProcessService {
 
     /**
      * for show in page of taskInstance
-     * @param taskInstance
      */
     public void changeOutParam(TaskInstance taskInstance) {
         if (StringUtils.isEmpty(taskInstance.getVarPool())) {
@@ -1742,7 +1739,7 @@ public class ProcessService {
      */
     public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
         return taskInstanceMapper.queryByHostAndStatus(host,
-                stateArray);
+            stateArray);
     }
 
     /**
@@ -1838,8 +1835,8 @@ public class ProcessService {
      */
     public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
         return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
-                dateInterval.getStartTime(),
-                dateInterval.getEndTime());
+            dateInterval.getStartTime(),
+            dateInterval.getEndTime());
     }
 
     /**
@@ -1851,8 +1848,8 @@ public class ProcessService {
      */
     public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
         return processInstanceMapper.queryLastManualProcess(definitionCode,
-                dateInterval.getStartTime(),
-                dateInterval.getEndTime());
+            dateInterval.getStartTime(),
+            dateInterval.getEndTime());
     }
 
     /**
@@ -1865,9 +1862,9 @@ public class ProcessService {
      */
     public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
         return processInstanceMapper.queryLastRunningProcess(definitionCode,
-                startTime,
-                endTime,
-                stateArray);
+            startTime,
+            endTime,
+            stateArray);
     }
 
     /**
@@ -2125,10 +2122,10 @@ public class ProcessService {
         AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams());
         if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
             resourceIds = params.getResourceFilesList().
-                    stream()
-                    .filter(t -> t.getId() != 0)
-                    .map(ResourceInfo::getId)
-                    .collect(Collectors.toSet());
+                stream()
+                .filter(t -> t.getId() != 0)
+                .map(ResourceInfo::getId)
+                .collect(Collectors.toSet());
         }
         if (CollectionUtils.isEmpty(resourceIds)) {
             return StringUtils.EMPTY;
@@ -2189,7 +2186,7 @@ public class ProcessService {
     public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
                                      ProcessData processData, ProcessDefinition processDefinition, Boolean isFromProcessDefine) {
         ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
-                name, processData, project, desc, locations);
+            name, processData, project, desc, locations);
         Map<String, TaskDefinition> taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks(), isFromProcessDefine);
         if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) {
             return Constants.DEFINITION_FAILURE;
@@ -2289,33 +2286,33 @@ public class ProcessService {
             if (CollectionUtils.isNotEmpty(depList)) {
                 for (String preTaskName : depList) {
                     builderRelationList.add(new ProcessTaskRelation(
-                            StringUtils.EMPTY,
-                            processDefinition.getVersion(),
-                            projectCode,
-                            processDefinition.getCode(),
-                            taskDefinitionMap.get(preTaskName).getCode(),
-                            taskDefinitionMap.get(preTaskName).getVersion(),
-                            taskDefinitionMap.get(taskNode.getName()).getCode(),
-                            taskDefinitionMap.get(taskNode.getName()).getVersion(),
-                            ConditionType.NONE,
-                            StringUtils.EMPTY,
-                            now,
-                            now));
-                }
-            } else {
-                builderRelationList.add(new ProcessTaskRelation(
                         StringUtils.EMPTY,
                         processDefinition.getVersion(),
                         projectCode,
                         processDefinition.getCode(),
-                        0L, // this isn't previous task node, set zero
-                        0,
+                        taskDefinitionMap.get(preTaskName).getCode(),
+                        taskDefinitionMap.get(preTaskName).getVersion(),
                         taskDefinitionMap.get(taskNode.getName()).getCode(),
                         taskDefinitionMap.get(taskNode.getName()).getVersion(),
                         ConditionType.NONE,
                         StringUtils.EMPTY,
                         now,
                         now));
+                }
+            } else {
+                builderRelationList.add(new ProcessTaskRelation(
+                    StringUtils.EMPTY,
+                    processDefinition.getVersion(),
+                    projectCode,
+                    processDefinition.getCode(),
+                    0L, // this isn't previous task node, set zero
+                    0,
+                    taskDefinitionMap.get(taskNode.getName()).getCode(),
+                    taskDefinitionMap.get(taskNode.getName()).getVersion(),
+                    ConditionType.NONE,
+                    StringUtils.EMPTY,
+                    now,
+                    now));
             }
         }
         for (ProcessTaskRelation processTaskRelation : builderRelationList) {
@@ -2354,9 +2351,9 @@ public class ProcessService {
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
         if (!processTaskRelationList.isEmpty()) {
             Set<Long> processDefinitionCodes = processTaskRelationList
-                    .stream()
-                    .map(ProcessTaskRelation::getProcessDefinitionCode)
-                    .collect(Collectors.toSet());
+                .stream()
+                .map(ProcessTaskRelation::getProcessDefinitionCode)
+                .collect(Collectors.toSet());
             List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
             // check process definition is already online
             for (ProcessDefinition processDefinition : processDefinitionList) {
@@ -2389,6 +2386,10 @@ public class ProcessService {
      */
     public DagData genDagData(ProcessDefinition processDefinition) {
         List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+        return new DagData(processDefinition, processTaskRelations, genTaskDefineList(processTaskRelations));
+    }
+
+    private List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelationLog> processTaskRelations) {
         Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
         for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
             if (processTaskRelation.getPreTaskCode() > 0) {
@@ -2398,8 +2399,7 @@ public class ProcessService {
                 taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
             }
         }
-        List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
-        return new DagData(processDefinition, processTaskRelations, taskDefinitionLogs);
+        return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
     }
 
     /**
@@ -2467,8 +2467,8 @@ public class ProcessService {
             v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
             v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
             v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
-                    taskDefinitionLog.getTimeoutNotifyStrategy(),
-                    taskDefinitionLog.getTimeout())));
+                taskDefinitionLog.getTimeoutNotifyStrategy(),
+                taskDefinitionLog.getTimeout())));
             v.setDelayTime(taskDefinitionLog.getDelayTime());
             v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
             v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList())));
@@ -2488,7 +2488,7 @@ public class ProcessService {
      */
     public List<TaskDefinitionLog> queryTaskDefinitionListByProcess(long processCode, int processVersion) {
         List<ProcessTaskRelationLog> processTaskRelationLogs =
-                processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
+            processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
         Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
         for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogs) {
             if (processTaskRelationLog.getPreTaskCode() > 0) {
@@ -2532,4 +2532,55 @@ public class ProcessService {
         List<Resource> relationResources = CollectionUtils.isNotEmpty(relationResourceIds) ? resourceMapper.queryResourceListById(relationResourceIds) : new ArrayList<>();
         ownResources.addAll(relationResources);
     }
+
+    /**
+     * Use temporarily before refactoring taskNode
+     */
+    public List<TaskNode> transformTask(List<ProcessTaskRelationLog> taskRelationList) {
+        Map<Long, List<Long>> taskCodeMap = new HashMap<>();
+        for (ProcessTaskRelationLog processTaskRelation : taskRelationList) {
+            taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
+                if (v == null) {
+                    v = new ArrayList<>();
+                }
+                if (processTaskRelation.getPreTaskCode() != 0L) {
+                    v.add(processTaskRelation.getPreTaskCode());
+                }
+                return v;
+            });
+        }
+        List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelationList);
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
+            .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
+        List<TaskNode> taskNodeList = new ArrayList<>();
+        for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
+            TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey());
+            if (taskDefinitionLog != null) {
+                TaskNode taskNode = new TaskNode();
+                taskNode.setCode(taskDefinitionLog.getCode());
+                taskNode.setVersion(taskDefinitionLog.getVersion());
+                taskNode.setName(taskDefinitionLog.getName());
+                taskNode.setDesc(taskDefinitionLog.getDescription());
+                taskNode.setType(taskDefinitionLog.getTaskType().toUpperCase());
+                taskNode.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+                taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
+                taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
+                Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
+                taskNode.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
+                taskNode.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
+                taskParamsMap.remove(Constants.CONDITION_RESULT);
+                taskParamsMap.remove(Constants.DEPENDENCE);
+                taskNode.setParams(JSONUtils.toJsonString(taskParamsMap));
+                taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
+                taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
+                taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
+                    taskDefinitionLog.getTimeoutNotifyStrategy(),
+                    taskDefinitionLog.getTimeout())));
+                taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
+                taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getName).collect(Collectors.toList())));
+                taskNodeList.add(taskNode);
+            }
+        }
+        return taskNodeList;
+    }
 }