You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/19 09:14:51 UTC

[dolphinscheduler] branch 2.0.3-prepare updated: [Bug] [dolphinscheduler] fix duplicate key (#8111)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
     new 6bf8063  [Bug] [dolphinscheduler] fix duplicate key (#8111)
6bf8063 is described below

commit 6bf8063fe8b8491d33412452dfd58f81ab88c7c8
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Wed Jan 19 17:14:41 2022 +0800

    [Bug] [dolphinscheduler] fix duplicate key (#8111)
    
    * fix #8043
    
    * fix null when no change
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 52 ++++++++++-------
 .../impl/ProcessTaskRelationServiceImpl.java       | 65 ++++++++++++++++++++--
 .../service/impl/TaskDefinitionServiceImpl.java    | 47 ++++++++++++----
 .../api/service/ProcessDefinitionServiceTest.java  |  1 +
 .../service/ProcessTaskRelationServiceTest.java    | 17 +++++-
 .../service/process/ProcessService.java            | 33 ++++++++++-
 6 files changed, 176 insertions(+), 39 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index dc76849..cfbeb25 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -577,29 +577,43 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
-        int insertVersion;
-        if (processDefinition.equals(processDefinitionDeepCopy)) {
-            insertVersion = processDefinitionDeepCopy.getVersion();
-            ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinition.getCode(), insertVersion);
-            processDefinitionLog.setOperator(loginUser.getId());
-            processDefinitionLog.setOperateTime(new Date());
-            int update = processDefinitionLogMapper.updateById(processDefinitionLog);
+        boolean isChange = false;
+        if (processDefinition.equals(processDefinitionDeepCopy) && saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
+            List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+            if (taskRelationList.size() == processTaskRelationLogList.size()) {
+                Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
+                    taskRelationList.stream().collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog));
+                for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+                    if (!processTaskRelationLog.equals(taskRelationLogMap.get(processTaskRelationLog.getPostTaskCode()))) {
+                        isChange = true;
+                        break;
+                    }
+                }
+            } else {
+                isChange = true;
+            }
         } else {
-            processDefinition.setUpdateTime(new Date());
-            insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
-        }
-        if (insertVersion == 0) {
-            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            isChange = true;
         }
-        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
-            processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
-        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+        if (isChange) {
+            processDefinition.setUpdateTime(new Date());
+            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+            if (insertVersion <= 0) {
+                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            }
+            int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
+                processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
+            if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+                putMsg(result, Status.SUCCESS);
+                result.put(Constants.DATA_LIST, processDefinition);
+            } else {
+                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            }
+        } else {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
-        } else {
-            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
         return result;
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index b49a089..a53bbe7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -88,6 +89,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
     @Autowired
     private ProcessDefinitionMapper processDefinitionMapper;
 
+    @Autowired
+    private ProcessService processService;
+
     /**
      * create process task relation
      *
@@ -135,6 +139,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
                 }
             }
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         Date now = new Date();
         List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
         if (preTaskCode != 0L) {
@@ -268,6 +273,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
                 return result;
             }
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         Date now = new Date();
         ProcessTaskRelation processTaskRelation = upstreamList.get(0);
         ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation);
@@ -290,6 +296,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         return result;
     }
 
+    private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+        if (insertVersion <= 0) {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+        processDefinition.setVersion(insertVersion);
+    }
+
     /**
      * delete process task relation
      *
@@ -331,7 +346,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
             return result;
         }
-
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
         processTaskRelationLog.setProjectCode(projectCode);
         processTaskRelationLog.setPostTaskCode(taskCode);
@@ -379,7 +394,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
             return result;
         }
-        Status status = deleteUpstreamRelation(loginUser.getId(), projectCode,
+        Status status = deleteUpstreamRelation(loginUser, projectCode,
             Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode);
         if (status != Status.SUCCESS) {
             putMsg(result, status);
@@ -417,11 +432,23 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
         int delete = 0;
         int deleteLog = 0;
+        Set<Long> processCodeSet = new HashSet<>();
         for (long postTaskCode : postTaskCodesSet) {
             ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode);
             if (processTaskRelationLog != null) {
                 delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
                 deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+                processCodeSet.add(processTaskRelationLog.getProcessDefinitionCode());
+            }
+        }
+        for (long code : processCodeSet) {
+            ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
+            if (processDefinition == null) {
+                throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
+            }
+            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+            if (insertVersion <= 0) {
+                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
             }
         }
         if ((delete & deleteLog) == 0) {
@@ -514,6 +541,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
      * @param postTaskCode          post task code
      * @return delete result code
      */
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -522,6 +550,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
+            return result;
+        }
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode);
         if (CollectionUtils.isEmpty(processTaskRelationList)) {
             putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
@@ -546,11 +579,22 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             }
             return result;
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
+        processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
         processTaskRelation.setPreTaskVersion(0);
         processTaskRelation.setPreTaskCode(0L);
+        Date now = new Date();
+        processTaskRelation.setUpdateTime(now);
         int update = processTaskRelationMapper.updateById(processTaskRelation);
-        if (update == 0) {
+        processTaskRelation.setId(0);
+        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
+        processTaskRelationLog.setCreateTime(now);
+        processTaskRelationLog.setOperator(loginUser.getId());
+        processTaskRelationLog.setOperateTime(now);
+        int insert = processTaskRelationLogMapper.insert(processTaskRelationLog);
+        if ((update & insert) == 0) {
             putMsg(result, Status.DELETE_EDGE_ERROR);
+            throw new ServiceException(Status.DELETE_EDGE_ERROR);
         }
         return result;
     }
@@ -592,7 +636,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
      * @param taskCode     pre task code
      * @return status
      */
-    private Status deleteUpstreamRelation(int userId, long projectCode, Long[] preTaskCodes, long taskCode) {
+    private Status deleteUpstreamRelation(User loginUser, long projectCode, Long[] preTaskCodes, long taskCode) {
         List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
         if (CollectionUtils.isEmpty(upstreamList)) {
             return Status.SUCCESS;
@@ -601,7 +645,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         Date now = new Date();
         for (ProcessTaskRelation processTaskRelation : upstreamList) {
             ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-            processTaskRelationLog.setOperator(userId);
+            processTaskRelationLog.setOperator(loginUser.getId());
             processTaskRelationLog.setOperateTime(now);
             processTaskRelationLog.setUpdateTime(now);
             upstreamLogList.add(processTaskRelationLog);
@@ -617,14 +661,23 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         for (Map<String, Long> codeCountMap : countListGroupByProcessDefinitionCode) {
             long processDefinitionCode = codeCountMap.get("processDefinitionCode");
             long countValue = codeCountMap.get("countValue");
+            ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+            if (processDefinition == null) {
+                return Status.PROCESS_DEFINE_NOT_EXIST;
+            }
+            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+            if (insertVersion <= 0) {
+                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            }
             List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
             if (countValue <= processTaskRelationLogList.size()) {
                 ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0);
                 if (processTaskRelationLog.getPreTaskCode() != 0) {
                     processTaskRelationLog.setPreTaskCode(0);
                     processTaskRelationLog.setPreTaskVersion(0);
-                    updates.add(processTaskRelationLog);
                 }
+                processTaskRelationLog.setProcessDefinitionVersion(insertVersion);
+                updates.add(processTaskRelationLog);
             }
             if (!processTaskRelationLogList.isEmpty()) {
                 deletes.addAll(processTaskRelationLogList);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 1c52cd1..749ebac 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -31,12 +31,14 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -95,6 +97,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
 
     @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Autowired
     private ProcessService processService;
 
     @Autowired
@@ -178,6 +183,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
      * @param projectCode project code
      * @param taskCode task code
      */
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -213,23 +219,35 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
             if (!taskRelationList.isEmpty()) {
                 int deleteRelation = 0;
-                int deleteRelationLog = 0;
                 for (ProcessTaskRelation processTaskRelation : taskRelationList) {
-                    ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-                    deleteRelation += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                    deleteRelationLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+                    deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId());
                 }
-                if ((deleteRelation & deleteRelationLog) == 0) {
+                if (deleteRelation == 0) {
                     throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
                 }
+                long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
+                updateProcessDefiniteVersion(loginUser, processDefinitionCode);
             }
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
+            throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
         }
         return result;
     }
 
+    private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) {
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
+        }
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+        if (insertVersion <= 0) {
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+        return insertVersion;
+    }
+
     /**
      * update task definition
      *
@@ -290,27 +308,35 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
+        handleRelation(loginUser, taskCode, version, now);
+        result.put(Constants.DATA_LIST, taskCode);
+        putMsg(result, Status.SUCCESS, update);
+        return result;
+    }
+
+    private void handleRelation(User loginUser, long taskCode, Integer version, Date now) {
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
         if (!processTaskRelationList.isEmpty()) {
+            long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
+            int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
             List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
             int delete = 0;
-            int deleteLog = 0;
             for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
                 ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
                 delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
                 if (processTaskRelationLog.getPreTaskCode() == taskCode) {
                     processTaskRelationLog.setPreTaskVersion(version);
                 }
                 if (processTaskRelationLog.getPostTaskCode() == taskCode) {
                     processTaskRelationLog.setPostTaskVersion(version);
                 }
+                processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
                 processTaskRelationLog.setOperator(loginUser.getId());
                 processTaskRelationLog.setOperateTime(now);
                 processTaskRelationLog.setUpdateTime(now);
                 processTaskRelationLogList.add(processTaskRelationLog);
             }
-            if ((delete & deleteLog) == 0) {
+            if (delete == 0) {
                 throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
             } else {
                 int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
@@ -320,9 +346,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                 }
             }
         }
-        result.put(Constants.DATA_LIST, taskCode);
-        putMsg(result, Status.SUCCESS, update);
-        return result;
     }
 
     /**
@@ -352,10 +375,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         }
         TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
         taskDefinitionUpdate.setUserId(loginUser.getId());
+        Date now = new Date();
         taskDefinitionUpdate.setUpdateTime(new Date());
         taskDefinitionUpdate.setId(taskDefinition.getId());
         int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate);
         if (switchVersion > 0) {
+            handleRelation(loginUser, taskCode, version, now);
             result.put(Constants.DATA_LIST, taskCode);
             putMsg(result, Status.SUCCESS);
         } else {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index c06dc23..497c3d3 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -667,6 +667,7 @@ public class ProcessDefinitionServiceTest {
         processDefinition.setTenantId(1);
         processDefinition.setDescription("");
         processDefinition.setCode(46L);
+        processDefinition.setVersion(1);
         return processDefinition;
     }
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
index 37a4bb1..5db7868 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.collections.CollectionUtils;
 
@@ -88,6 +89,9 @@ public class ProcessTaskRelationServiceTest {
     @Mock
     private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
 
+    @Mock
+    private ProcessService processService;
+
     /**
      * get Mock Admin User
      *
@@ -456,6 +460,9 @@ public class ProcessTaskRelationServiceTest {
         ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
         Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
         Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
+        ProcessDefinition processDefinition = getProcessDefinition();
+        Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(processDefinition);
+        Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
         Map<String, Object> result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode);
         Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
     }
@@ -541,6 +548,7 @@ public class ProcessTaskRelationServiceTest {
         processTaskRelationLog.setProcessDefinitionVersion(1);
         Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
         Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
+        Mockito.when(processService.saveProcessDefine(loginUser, getProcessDefinition(), Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
         result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
@@ -549,7 +557,7 @@ public class ProcessTaskRelationServiceTest {
     public void testDeleteEdge() {
         long projectCode = 1L;
         long processDefinitionCode = 3L;
-        long preTaskCode = 4L;
+        long preTaskCode = 0L;
         long postTaskCode = 5L;
         Project project = getProject(projectCode);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@@ -563,14 +571,21 @@ public class ProcessTaskRelationServiceTest {
         ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
         processTaskRelation.setProjectCode(projectCode);
         processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
+        processTaskRelation.setProcessDefinitionVersion(1);
         processTaskRelation.setPreTaskCode(preTaskCode);
         processTaskRelation.setPostTaskCode(postTaskCode);
+        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
+        processTaskRelationLog.setOperator(loginUser.getId());
         List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
         processTaskRelationList.add(processTaskRelation);
         Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList);
         Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1);
         Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1);
         Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
+        Mockito.when(processTaskRelationLogMapper.insert(processTaskRelationLog)).thenReturn(1);
+        ProcessDefinition processDefinition = getProcessDefinition();
+        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
+        Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
         result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bc39ca3..c214723 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2184,7 +2184,7 @@ public class ProcessService {
         if (result > 0) {
             result = switchProcessTaskRelationVersion(processDefinitionLog);
             if (result <= 0) {
-                return Constants.DEFINITION_FAILURE;
+                return Constants.EXIT_CODE_FAILURE;
             }
         }
         return result;
@@ -2196,7 +2196,36 @@ public class ProcessService {
             processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
         }
         List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
-        return processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+        int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+        if (batchInsert == 0) {
+            return Constants.EXIT_CODE_FAILURE;
+        } else {
+            int result = 0;
+            for (ProcessTaskRelationLog taskRelationLog : processTaskRelationLogList) {
+                int switchResult = switchTaskDefinitionVersion(taskRelationLog.getPostTaskCode(), taskRelationLog.getPostTaskVersion());
+                if (switchResult != Constants.EXIT_CODE_FAILURE) {
+                    result++;
+                }
+            }
+            return result;
+        }
+    }
+
+    public int switchTaskDefinitionVersion(long taskCode, int taskVersion) {
+        TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
+        if (taskDefinition == null) {
+            return Constants.EXIT_CODE_FAILURE;
+        }
+        if (taskDefinition.getVersion() == taskVersion) {
+            return Constants.EXIT_CODE_SUCCESS;
+        }
+        TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskVersion);
+        if (taskDefinitionUpdate == null) {
+            return Constants.EXIT_CODE_FAILURE;
+        }
+        taskDefinitionUpdate.setUpdateTime(new Date());
+        taskDefinitionUpdate.setId(taskDefinition.getId());
+        return taskDefinitionMapper.updateById(taskDefinitionUpdate);
     }
 
     /**