You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/14 06:58:03 UTC

[dolphinscheduler] branch dev updated: [Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ca5af013ae [Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094)
ca5af013ae is described below

commit ca5af013ae6a2c9272ece92264060cf5ebbf80f2
Author: insist777 <84...@users.noreply.github.com>
AuthorDate: Wed Dec 14 14:57:57 2022 +0800

    [Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094)
    
    Co-authored-by: xiangzihao <46...@qq.com>
---
 .../ProcessTaskRelationV2Controller.java           |   3 +-
 .../api/dto/workflow/WorkflowUpdateRequest.java    |   2 -
 .../apache/dolphinscheduler/api/enums/Status.java  |   1 +
 .../api/service/ProcessTaskRelationService.java    |   8 +-
 .../service/impl/ProcessDefinitionServiceImpl.java | 107 +++++++++++++++-
 .../impl/ProcessTaskRelationServiceImpl.java       | 135 +++++++++++++++++++--
 .../service/impl/TaskDefinitionServiceImpl.java    |  15 +--
 .../api/service/ProcessDefinitionServiceTest.java  |  14 ++-
 .../api/service/TaskDefinitionServiceImplTest.java | 108 ++++++++++++++++-
 9 files changed, 360 insertions(+), 33 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
index 8430651220..b6798e39bf 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
@@ -125,7 +125,8 @@ public class ProcessTaskRelationV2Controller extends BaseController {
                                                                           @PathVariable("code") Long code,
                                                                           @RequestBody TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
         List<ProcessTaskRelation> processTaskRelations = processTaskRelationService
-                .updateUpstreamTaskDefinition(loginUser, code, taskRelationUpdateUpstreamRequest);
+                .updateUpstreamTaskDefinitionWithSyncDag(loginUser, code, Boolean.TRUE,
+                        taskRelationUpdateUpstreamRequest);
         return Result.success(processTaskRelations);
     }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
index 7e3d90d46a..c402285eff 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
@@ -103,8 +103,6 @@ public class WorkflowUpdateRequest {
             processDefinitionDeepCopy.setLocations(this.location);
         }
 
-        int version = processDefinitionDeepCopy.getVersion() + 1;
-        processDefinitionDeepCopy.setVersion(version);
         processDefinitionDeepCopy.setUpdateTime(new Date());
         return processDefinitionDeepCopy;
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index d0f82b3e99..0f1cb95b47 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -381,6 +381,7 @@ public enum Status {
             "批量创建工作流任务关系 {0} 错误"),
     PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
             "批量修改工作流任务关系错误"),
+    UPSTREAM_TASK_NOT_EXISTS(50071, "upstream task want to set dependence do not exists {0}", "指定的上游任务 {0} 不存在"),
 
     WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation",
             "工作流实例未结束,不能执行此操作"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
index 7a31d49916..f5a1f7a082 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
@@ -86,11 +86,13 @@ public interface ProcessTaskRelationService {
      *
      * @param loginUser login user
      * @param taskCode relation upstream code
+     * @param needSyncDag needSyncDag
      * @param taskRelationUpdateUpstreamRequest relation downstream code
      */
-    List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
-                                                           long taskCode,
-                                                           TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest);
+    List<ProcessTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
+                                                                      long taskCode,
+                                                                      Boolean needSyncDag,
+                                                                      TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest);
 
     /**
      * delete task upstream relation
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 5897807a0f..4b94af1ee7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
+import static java.util.stream.Collectors.toSet;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
@@ -2775,12 +2776,110 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             }
             processDefinitionUpdate.setTenantId(tenant.getId());
         }
-        int update = processDefinitionMapper.updateById(processDefinitionUpdate);
-        if (update <= 0) {
+        int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+        if (insertVersion == 0) {
+            logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+                    processDefinitionUpdate.getCode(),
+                    processDefinitionUpdate.getName());
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        this.syncObj2Log(loginUser, processDefinition);
-        return processDefinition;
+
+        int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+        if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        }
+        logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        processDefinitionUpdate.setVersion(insertVersion);
+        return processDefinitionUpdate;
+    }
+
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+        processDefinition.setVersion(insertVersion);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinition.setUserId(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinition.setUpdateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int result = processDefinitionMapper.updateById(processDefinition);
+
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+        processDefinitionLog.setId(processDefinition.getId());
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
+    public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+            processTaskRelationLog.setProjectCode(projectCode);
+            processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (!taskRelations.isEmpty()) {
+            Set<Integer> processTaskRelationSet =
+                    taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean isSame = CollectionUtils.isEqualCollection(processTaskRelationSet,
+                    taskRelationSet);
+            if (isSame) {
+                logger.info("process task relations is non-existent, projectCode:{}, processCode:{}.",
+                        processDefinition.getProjectCode(), processDefinition.getCode());
+                return Constants.EXIT_CODE_SUCCESS;
+            }
+            processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
+        }
+        List<ProcessTaskRelation> processTaskRelations =
+                taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList());
+        int insert = processTaskRelationMapper.batchInsert(processTaskRelations);
+        int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
+        return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
     }
 
     protected Map<String, Object> updateDagSchedule(User loginUser,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index df1b075d9d..4feff61e4a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
+import static java.util.stream.Collectors.toSet;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
@@ -31,12 +32,14 @@ import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.ConditionType;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
@@ -93,6 +96,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
     @Autowired
     private ProcessDefinitionMapper processDefinitionMapper;
 
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
     @Autowired
     private ProcessService processService;
 
@@ -402,13 +408,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
      *
      * @param loginUser login user
      * @param taskCode relation upstream code
+     * @param needSyncDag needSyncDag
      * @param taskRelationUpdateUpstreamRequest relation downstream code
      */
     @Override
     @Transactional
-    public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
-                                                                  long taskCode,
-                                                                  TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
+    public List<ProcessTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
+                                                                             long taskCode,
+                                                                             Boolean needSyncDag,
+                                                                             TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
         TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode);
         if (downstreamTask == null) {
             throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
@@ -436,17 +444,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
                     taskRelationUpdateUpstreamRequest.toString());
         }
-
+        processDefinition.setUpdateTime(new Date());
+        int insertVersion = processDefinition.getVersion();
+        if (needSyncDag) {
+            insertVersion =
+                    this.saveProcessDefine(loginUser, processDefinition);
+            if (insertVersion <= 0) {
+                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            }
+        }
         // get new relation to create and out of date relation to delete
         List<Long> taskCodeCreates = upstreamTaskCodes
                 .stream()
                 .filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch(
                         processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode))
                 .collect(Collectors.toList());
-        List<Long> taskCodeDeletes = processTaskRelationExists
-                .stream()
-                .map(ProcessTaskRelation::getPreTaskCode)
-                .filter(preTaskCode -> !upstreamTaskCodes.contains(preTaskCode))
+        List<Integer> taskCodeDeletes = processTaskRelationExists.stream()
+                .filter(ptr -> !upstreamTaskCodes.contains(ptr.getPreTaskCode()))
+                .map(ProcessTaskRelation::getId)
                 .collect(Collectors.toList());
 
         // delete relation not exists
@@ -460,10 +475,20 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         // create relation not exists
         List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
         for (long createCode : taskCodeCreates) {
-            TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
+            long upstreamCode = 0L;
+            int version = 0;
+            if (createCode != 0L) {
+                // 0 for DAG root, should not, it may already exists and skip to create anymore
+                TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
+                if (upstreamTask == null) {
+                    throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, createCode);
+                }
+                upstreamCode = upstreamTask.getCode();
+                version = upstreamTask.getVersion();
+            }
             ProcessTaskRelation processTaskRelationCreate =
                     new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(),
-                            processDefinition.getCode(), upstreamTask.getCode(), upstreamTask.getVersion(),
+                            processDefinition.getCode(), upstreamCode, version,
                             downstreamTask.getCode(), downstreamTask.getVersion(), null, null);
             processTaskRelations.add(processTaskRelationCreate);
         }
@@ -473,10 +498,98 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         }
 
         // batch sync to process task relation log
-        this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
+        int saveTaskRelationResult = saveTaskRelation(loginUser, processDefinition, insertVersion);
+        if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) {
+            logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        }
+        logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
         return processTaskRelations;
     }
 
+    public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+                                int processDefinitionVersion) {
+        long projectCode = processDefinition.getProjectCode();
+        long processDefinitionCode = processDefinition.getCode();
+        List<ProcessTaskRelation> taskRelations =
+                processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        List<ProcessTaskRelationLog> taskRelationList =
+                taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+        List<Long> taskCodeList =
+                taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+        List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+        List<TaskDefinitionLog> taskDefinitionLogs =
+                taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+        if (taskRelationList.isEmpty()) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+        }
+        Date now = new Date();
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+            processTaskRelationLog.setProjectCode(projectCode);
+            processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+            processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog preTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (preTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+                }
+                TaskDefinitionLog postTaskDefinitionLog =
+                        taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+                if (postTaskDefinitionLog != null) {
+                    processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+                }
+            }
+            processTaskRelationLog.setCreateTime(now);
+            processTaskRelationLog.setUpdateTime(now);
+            processTaskRelationLog.setOperator(loginUser.getId());
+            processTaskRelationLog.setOperateTime(now);
+        }
+        if (CollectionUtils.isNotEmpty(taskRelations)) {
+            Set<Integer> processTaskRelationSet =
+                    taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+            Set<Integer> taskRelationSet =
+                    taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+            boolean isSame = org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,
+                    taskRelationSet);
+            if (isSame) {
+                return Constants.EXIT_CODE_SUCCESS;
+            }
+            processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
+        }
+        List<ProcessTaskRelation> processTaskRelations =
+                taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList());
+        int insert = processTaskRelationMapper.batchInsert(processTaskRelations);
+        int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
+        return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
+    }
+
+    public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+        Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+        int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+        processDefinitionLog.setVersion(insertVersion);
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+        processDefinitionLog.setId(null);
+        int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+
+        processDefinitionLog.setId(processDefinition.getId());
+        int result = processDefinitionMapper.updateById(processDefinitionLog);
+        return (insertLog & result) > 0 ? insertVersion : 0;
+    }
+
     private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
                                 List<ProcessTaskRelation> processTaskRelationList) {
         List<ProcessTaskRelationLog> relationLogs =
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index b641d1d7f4..9abbb07253 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -214,8 +214,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                                                           String upstreamCodes) {
         TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest = new TaskRelationUpdateUpstreamRequest();
         taskRelationUpdateUpstreamRequest.setWorkflowCode(workflowCode);
-        taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes);
-        return processTaskRelationService.updateUpstreamTaskDefinition(user, taskCode,
+        if (upstreamCodes != null) {
+            taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes);
+        }
+        return processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(user, taskCode, Boolean.FALSE,
                 taskRelationUpdateUpstreamRequest);
     }
 
@@ -498,9 +500,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         }
     }
 
-    private void updateDag(User loginUser, long processDefinitionCode,
-                           List<ProcessTaskRelation> processTaskRelationList,
-                           List<TaskDefinitionLog> taskDefinitionLogs) {
+    public void updateDag(User loginUser, long processDefinitionCode,
+                          List<ProcessTaskRelation> processTaskRelationList,
+                          List<TaskDefinitionLog> taskDefinitionLogs) {
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
         if (processDefinition == null) {
             logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
@@ -625,6 +627,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
 
         List<ProcessTaskRelation> taskRelationList =
                 processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode);
+
         if (CollectionUtils.isNotEmpty(taskRelationList)) {
             logger.info(
                     "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.",
@@ -634,10 +637,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                     .queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode);
             updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog));
         }
-
         this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(),
                 taskUpdateRequest.getUpstreamTasksCodes());
-
         return taskDefinitionUpdate;
     }
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 8b600696b7..5ada289227 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -1068,11 +1068,23 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
                 ((ServiceException) exception).getCode());
 
         // success
+        Mockito.when(processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()))
+                .thenReturn(processDefinition.getVersion());
         Mockito.when(processDefinitionMapper.updateById(isA(ProcessDefinition.class))).thenReturn(1);
         ProcessDefinition processDefinitionUpdate =
                 processDefinitionService.updateSingleProcessDefinition(user, processDefinitionCode,
                         workflowUpdateRequest);
-        Assertions.assertEquals(processDefinition, processDefinitionUpdate);
+        Assertions.assertNotNull(processDefinitionUpdate);
+
+        // check version
+        Assertions.assertEquals(processDefinition.getVersion() + 1, processDefinitionUpdate.getVersion());
+    }
+
+    @Test
+    public void testCheckVersion() {
+        WorkflowFilterRequest workflowFilterRequest = new WorkflowFilterRequest();
+        workflowFilterRequest.setWorkflowName(name);
+
     }
 
     @Test
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index b6569a1e67..b6668dff15 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isA;
 
 import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
@@ -40,18 +41,23 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import java.text.MessageFormat;
@@ -91,9 +97,18 @@ public class TaskDefinitionServiceImplTest {
     @Mock
     private ProjectServiceImpl projectService;
 
+    @InjectMocks
+    private ProcessServiceImpl processServiceImpl;
+
     @Mock
     private ProcessService processService;
 
+    @Mock
+    private ProcessDefinitionLogMapper processDefineLogMapper;
+
+    @Mock
+    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
     @Mock
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
@@ -429,11 +444,15 @@ public class TaskDefinitionServiceImplTest {
         // success
         Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
         // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
-        Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class),
-                isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList());
+        Mockito.when(
+                processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
+                        isA(Boolean.class),
+                        isA(TaskRelationUpdateUpstreamRequest.class)))
+                .thenReturn(getProcessTaskRelationList());
         Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class),
                 isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition());
         Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
+
     }
 
     @Test
@@ -500,10 +519,54 @@ public class TaskDefinitionServiceImplTest {
         // success
         Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
         // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
-        Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class),
-                isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList());
+        Mockito.when(
+                processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
+                        isA(Boolean.class),
+                        isA(TaskRelationUpdateUpstreamRequest.class)))
+                .thenReturn(getProcessTaskRelationList());
         Assertions.assertDoesNotThrow(
                 () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
+
+        TaskDefinition taskDefinition =
+                taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest);
+        Assertions.assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion());
+    }
+
+    @Test
+    public void testUpdateDag() {
+        User loginUser = getLoginUser();
+        ProcessDefinition processDefinition = getProcessDefinition();
+        processDefinition.setId(null);
+        List<ProcessTaskRelation> processTaskRelationList = getProcessTaskRelationList();
+        TaskDefinitionLog taskDefinitionLog = getTaskDefinitionLog();
+        ArrayList<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+        taskDefinitionLogs.add(taskDefinitionLog);
+        Integer version = 1;
+        Mockito.when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition);
+
+        // saveProcessDefine
+        Mockito.when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version);
+        Mockito.when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
+        Mockito.when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
+        int insertVersion =
+                processServiceImpl.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+        Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE))
+                .thenReturn(insertVersion);
+        Assertions.assertEquals(insertVersion, version + 1);
+
+        // saveTaskRelation
+        List<ProcessTaskRelationLog> processTaskRelationLogList = getProcessTaskRelationLogList();
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getProjectCode()),
+                eq(processDefinition.getCode()))).thenReturn(processTaskRelationList);
+        Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1);
+        Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1);
+        int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
+                processDefinition.getCode(), insertVersion, processTaskRelationLogList, taskDefinitionLogs,
+                Boolean.TRUE);
+        Assertions.assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult);
+        Assertions.assertDoesNotThrow(
+                () -> taskDefinitionService.updateDag(loginUser, processDefinition.getCode(), processTaskRelationList,
+                        taskDefinitionLogs));
     }
 
     @Test
@@ -528,6 +591,17 @@ public class TaskDefinitionServiceImplTest {
         Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
     }
 
+    /**
+     * create admin user
+     */
+    private User getLoginUser() {
+        User loginUser = new User();
+        loginUser.setUserType(UserType.GENERAL_USER);
+        loginUser.setUserName("admin");
+        loginUser.setId(1);
+        return loginUser;
+    }
+
     /**
      * get mock Project
      *
@@ -563,6 +637,19 @@ public class TaskDefinitionServiceImplTest {
         return taskDefinition;
     }
 
+    private TaskDefinitionLog getTaskDefinitionLog() {
+        TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+        taskDefinitionLog.setProjectCode(PROJECT_CODE);
+        taskDefinitionLog.setCode(TASK_CODE);
+        taskDefinitionLog.setVersion(VERSION);
+        taskDefinitionLog.setTaskType("SHELL");
+        taskDefinitionLog.setTaskParams(TASK_PARAMETER);
+        taskDefinitionLog.setFlag(Flag.YES);
+        taskDefinitionLog.setCpuQuota(RESOURCE_RATE);
+        taskDefinitionLog.setMemoryMax(RESOURCE_RATE);
+        return taskDefinitionLog;
+    }
+
     private List<ProcessTaskRelation> getProcessTaskRelationList() {
         List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
 
@@ -576,6 +663,19 @@ public class TaskDefinitionServiceImplTest {
         return processTaskRelationList;
     }
 
+    private List<ProcessTaskRelationLog> getProcessTaskRelationLogList() {
+        List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
+
+        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
+        processTaskRelationLog.setProjectCode(PROJECT_CODE);
+        processTaskRelationLog.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+        processTaskRelationLog.setPreTaskCode(TASK_CODE);
+        processTaskRelationLog.setPostTaskCode(TASK_CODE + 1L);
+
+        processTaskRelationLogList.add(processTaskRelationLog);
+        return processTaskRelationLogList;
+    }
+
     private List<ProcessTaskRelation> getProcessTaskRelationList2() {
         List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();