You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/12/05 08:41:10 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #13094: [Fix] Fix the version inconsistency when updating workflows, tasks and relationships in openapi.

caishunfeng commented on code in PR #13094:
URL: https://github.com/apache/dolphinscheduler/pull/13094#discussion_r1039286888


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java:
##########
@@ -97,7 +97,7 @@ public class TaskCreateRequest {
     private Integer memoryMax;
 
     @Schema(example = "upstream-task-codes1,upstream-task-codes2", description = "use , to split multiple upstream task codes")
-    private String upstreamTasksCodes;
+    private String upstreamTasksCodes = "0";

Review Comment:
   What's the default value mean?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }

Review Comment:
   ```suggestion
           } 
          logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
                       processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);

Review Comment:
   Please use two different object to do it.
   
   Why need to update the old one with new object?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java:
##########
@@ -90,6 +90,7 @@ void deleteTaskProcessRelationV2(User loginUser,
      */
     List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
                                                            long taskCode,
+                                                           boolean needSyncDag,

Review Comment:
   It's better to create a new method like `updateUpstreamTaskDefinitionWithSyncDag`, WDYT?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+            processTaskRelationLog.setProjectCode(projectCode);
+            processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {
+            Set<Integer> processTaskRelationSet =
+                    taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean result = org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,
+                    taskRelationSet);
+            if (result) {
+                return Constants.EXIT_CODE_SUCCESS;

Review Comment:
   add some log here.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, insertVersion);

Review Comment:
   ```suggestion
           int saveTaskRelationResult = saveTaskRelation(loginUser, processDefinition, insertVersion);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
         return processTaskRelations;
     }
 
+    public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,

Review Comment:
   It seems just copy the function from `processService`, but not delete the old one.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
         return processTaskRelations;
     }
 
+    public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+            processTaskRelationLog.setProjectCode(projectCode);
+            processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {

Review Comment:
   ```suggestion
           if (CollectionUtils.isNotEmpty(taskRelations)) {
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }

Review Comment:
   ```suggestion
           } 
           logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
                   processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -473,10 +496,99 @@ public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
+        int saveTaskRelation = saveTaskRelation(loginUser, processDefinition, insertVersion);
+        if (saveTaskRelation != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);

Review Comment:
   Why set workflow version here? Just for the first one?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+            processTaskRelationLog.setProjectCode(projectCode);
+            processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {
+            Set<Integer> processTaskRelationSet =
+                    taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean result = org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,

Review Comment:
   ```suggestion
               boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet,
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition updateSingleProcessDefinition(User loginUser,
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        } else {
+            logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) {

Review Comment:
   ```suggestion
           if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org