You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/25 12:17:39 UTC

[dolphinscheduler] branch 2.0.4-prepare updated: fix relation delete (#8190)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
     new a24a816  fix relation delete (#8190)
a24a816 is described below

commit a24a81685aaddbf86ad6388e9f2e31b5ce404b14
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Tue Jan 25 20:17:33 2022 +0800

    fix relation delete (#8190)
---
 .../controller/ProcessTaskRelationController.java  |  44 +--
 .../api/service/ProcessTaskRelationService.java    |  16 -
 .../impl/ProcessTaskRelationServiceImpl.java       | 382 ++++++---------------
 .../service/impl/TaskDefinitionServiceImpl.java    |   4 +-
 .../service/ProcessTaskRelationServiceTest.java    | 131 ++-----
 5 files changed, 149 insertions(+), 428 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
index 54887b3..710c8ac 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
@@ -102,46 +102,6 @@ public class ProcessTaskRelationController extends BaseController {
     }
 
     /**
-     * move task to other processDefinition
-     *
-     * @param loginUser login user info
-     * @param projectCode project code
-     * @param processDefinitionCode process definition code
-     * @param targetProcessDefinitionCode target process definition code
-     * @param taskCode the current task code (the post task code)
-     * @return move result code
-     */
-    @ApiOperation(value = "moveRelation", notes = "MOVE_TASK_TO_OTHER_PROCESS_DEFINITION_NOTES")
-    @ApiImplicitParams({
-        @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "targetProcessDefinitionCode", value = "TARGET_PROCESS_DEFINITION_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
-    })
-    @PostMapping(value = "/move")
-    @ResponseStatus(HttpStatus.OK)
-    @ApiException(MOVE_PROCESS_TASK_RELATION_ERROR)
-    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result moveTaskProcessRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                          @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
-                                          @RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode,
-                                          @RequestParam(name = "targetProcessDefinitionCode", required = true) long targetProcessDefinitionCode,
-                                          @RequestParam(name = "taskCode", required = true) long taskCode) {
-        Map<String, Object> result = new HashMap<>();
-        if (processDefinitionCode == 0L) {
-            putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode");
-        } else if (targetProcessDefinitionCode == 0L) {
-            putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
-        } else if (taskCode == 0L) {
-            putMsg(result, DATA_IS_NOT_VALID, "taskCode");
-        } else {
-            result = processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode,
-                targetProcessDefinitionCode, taskCode);
-        }
-        return returnDataList(result);
-    }
-
-    /**
      * delete process task relation (delete task from workflow)
      *
      * @param loginUser login user
@@ -179,7 +139,7 @@ public class ProcessTaskRelationController extends BaseController {
     @ApiOperation(value = "deleteUpstreamRelation", notes = "DELETE_UPSTREAM_RELATION_NOTES")
     @ApiImplicitParams({
         @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "3,4"),
+        @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "1,2"),
         @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
     })
     @DeleteMapping(value = "/{taskCode}/upstream")
@@ -205,7 +165,7 @@ public class ProcessTaskRelationController extends BaseController {
     @ApiOperation(value = "deleteDownstreamRelation", notes = "DELETE_DOWNSTREAM_RELATION_NOTES")
     @ApiImplicitParams({
         @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
-        @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "3,4"),
+        @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "1,2"),
         @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
     })
     @DeleteMapping(value = "/{taskCode}/downstream")
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
index 4246823..f31cc8b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
@@ -43,22 +43,6 @@ public interface ProcessTaskRelationService {
                                                   long postTaskCode);
 
     /**
-     * move task to other processDefinition
-     *
-     * @param loginUser login user info
-     * @param projectCode project code
-     * @param processDefinitionCode process definition code
-     * @param targetProcessDefinitionCode target process definition code
-     * @param taskCode the current task code (the post task code)
-     * @return move result code
-     */
-    Map<String, Object> moveTaskProcessRelation(User loginUser,
-                                                long projectCode,
-                                                long processDefinitionCode,
-                                                long targetProcessDefinitionCode,
-                                                long taskCode);
-
-    /**
      * delete process task relation
      *
      * @param loginUser login user
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index a53bbe7..a864c61 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
-
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
@@ -26,7 +24,6 @@ import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ConditionType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@@ -47,7 +44,7 @@ import org.apache.commons.collections.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -58,8 +55,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 
 /**
@@ -191,111 +186,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         return processTaskRelationLog;
     }
 
-    /**
-     * move task to other processDefinition
-     *
-     * @param loginUser                   login user info
-     * @param projectCode                 project code
-     * @param processDefinitionCode       process definition code
-     * @param targetProcessDefinitionCode target process definition code
-     * @param taskCode                    the current task code (the post task code)
-     * @return move result code
-     */
-    @Transactional(rollbackFor = RuntimeException.class)
-    @Override
-    public Map<String, Object> moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) {
-        Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode);
-        if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode);
-            return result;
-        }
-        if (processDefinition.getProjectCode() != projectCode) {
-            putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
-            return result;
-        }
-        List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
-        if (CollectionUtils.isNotEmpty(downstreamList)) {
-            Set<Long> postTaskCodes = downstreamList
-                .stream()
-                .map(ProcessTaskRelation::getPostTaskCode)
-                .collect(Collectors.toSet());
-            putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
-            return result;
-        }
-        List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode);
-        if (upstreamList.isEmpty()) {
-            putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode);
-            return result;
-        } else {
-            Set<Long> preTaskCodes = upstreamList
-                .stream()
-                .map(ProcessTaskRelation::getPreTaskCode)
-                .collect(Collectors.toSet());
-            if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) {
-                putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ","));
-                return result;
-            }
-        }
-        TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
-        if (null == taskDefinition) {
-            putMsg(result, Status.DATA_IS_NULL, "taskDefinition");
-            return result;
-        }
-        ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams());
-        if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) {
-            Set<Long> depProcessDefinitionCodes = new HashSet<>();
-            ObjectNode dependence = (ObjectNode) paramNode.get("dependence");
-            ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
-            for (int i = 0; i < dependTaskList.size(); i++) {
-                ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
-                ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
-                for (int j = 0; j < dependItemList.size(); j++) {
-                    ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
-                    long definitionCode = dependItem.get("definitionCode").asLong();
-                    depProcessDefinitionCodes.add(definitionCode);
-                }
-            }
-            if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) {
-                putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
-                return result;
-            }
-        }
-        if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
-            long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong();
-            if (targetProcessDefinitionCode == subProcessDefinitionCode) {
-                putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
-                return result;
-            }
-        }
-        updateProcessDefiniteVersion(loginUser, result, processDefinition);
-        Date now = new Date();
-        ProcessTaskRelation processTaskRelation = upstreamList.get(0);
-        ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation);
-        processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
-        processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
-        processTaskRelation.setUpdateTime(now);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
-        processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
-        processTaskRelationLog.setUpdateTime(now);
-        processTaskRelationLog.setOperator(loginUser.getId());
-        processTaskRelationLog.setOperateTime(now);
-        int update = processTaskRelationMapper.updateById(processTaskRelation);
-        int updateLog = processTaskRelationLogMapper.updateById(processTaskRelationLog);
-        if (update == 0 || updateLog == 0) {
-            putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR);
-            throw new ServiceException(Status.MOVE_PROCESS_TASK_RELATION_ERROR);
-        } else {
-            putMsg(result, Status.SUCCESS);
-        }
-        return result;
-    }
-
     private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
         int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
         if (insertVersion <= 0) {
@@ -337,28 +227,25 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
             return result;
         }
-        List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
-        if (CollectionUtils.isNotEmpty(downstreamList)) {
-            Set<Long> postTaskCodes = downstreamList
-                .stream()
-                .map(ProcessTaskRelation::getPostTaskCode)
-                .collect(Collectors.toSet());
-            putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+        if (CollectionUtils.isEmpty(processTaskRelationList)) {
+            putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
             return result;
         }
-        updateProcessDefiniteVersion(loginUser, result, processDefinition);
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
-        int deleteRelation = processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-        int deleteRelationLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-        if (0 == deleteRelation || 0 == deleteRelationLog) {
-            putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-            throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+        List<Long> downstreamList = Lists.newArrayList();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+            if (processTaskRelation.getPreTaskCode() == taskCode) {
+                downstreamList.add(processTaskRelation.getPostTaskCode());
+            }
+            if (processTaskRelation.getPostTaskCode() == taskCode) {
+                processTaskRelationList.remove(processTaskRelation);
+            }
+        }
+        if (CollectionUtils.isNotEmpty(downstreamList)) {
+            putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ","));
+            return result;
         }
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
             || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
             || TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
@@ -372,6 +259,21 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         return result;
     }
 
+    private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
+                                List<ProcessTaskRelation> processTaskRelationList) {
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
+            processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
+        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+            putMsg(result, Status.SUCCESS);
+            result.put(Constants.DATA_LIST, processDefinition);
+        } else {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+    }
+
     /**
      * delete task upstream relation
      *
@@ -394,11 +296,42 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
             return result;
         }
-        Status status = deleteUpstreamRelation(loginUser, projectCode,
-            Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode);
-        if (status != Status.SUCCESS) {
-            putMsg(result, status);
+        List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+        if (CollectionUtils.isEmpty(upstreamList)) {
+            putMsg(result, Status.DATA_IS_NULL, "taskCode");
+            return result;
+        }
+
+        List<Long> preTaskCodeList = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList());
+        if (preTaskCodeList.contains(0L)) {
+            putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
+            return result;
+        }
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
+            return result;
         }
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
+        List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+            if (preTaskCodeList.size() > 1) {
+                if (preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
+                    preTaskCodeList.remove(processTaskRelation.getPreTaskCode());
+                    processTaskRelationWaitRemove.add(processTaskRelation);
+                }
+            } else {
+                if (processTaskRelation.getPostTaskCode() == taskCode) {
+                    processTaskRelation.setPreTaskVersion(0);
+                    processTaskRelation.setPreTaskCode(0L);
+                }
+            }
+            if (preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) {
+                processTaskRelationWaitRemove.add(processTaskRelation);
+            }
+        }
+        processTaskRelationList.removeAll(processTaskRelationWaitRemove);
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
 
@@ -424,38 +357,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
-        Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
-            processTaskRelationList.stream()
-                .map(ProcessTaskRelationLog::new)
-                .collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog));
-        Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
-        int delete = 0;
-        int deleteLog = 0;
-        Set<Long> processCodeSet = new HashSet<>();
-        for (long postTaskCode : postTaskCodesSet) {
-            ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode);
-            if (processTaskRelationLog != null) {
-                delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-                processCodeSet.add(processTaskRelationLog.getProcessDefinitionCode());
-            }
+        List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
+        if (CollectionUtils.isEmpty(downstreamList)) {
+            putMsg(result, Status.DATA_IS_NULL, "taskCode");
+            return result;
         }
-        for (long code : processCodeSet) {
-            ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
-            if (processDefinition == null) {
-                throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
-            }
-            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
-            if (insertVersion <= 0) {
-                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            }
+        List<Long> postTaskCodeList = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList());
+        if (postTaskCodeList.contains(0L)) {
+            putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
+            return result;
         }
-        if ((delete & deleteLog) == 0) {
-            throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-        } else {
-            putMsg(result, Status.SUCCESS);
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode());
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode());
+            return result;
         }
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
+        processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode);
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
 
@@ -555,47 +474,44 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode);
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
         if (CollectionUtils.isEmpty(processTaskRelationList)) {
             putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
             return result;
         }
-        if (processTaskRelationList.size() > 1) {
-            putMsg(result, Status.DATA_IS_NOT_VALID, "processTaskRelationList");
-            return result;
+        Map<Long, List<ProcessTaskRelation>> taskRelationMap = new HashMap<>();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+            taskRelationMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
+                if (v == null) {
+                    v = new ArrayList<>();
+                }
+                v.add(processTaskRelation);
+                return v;
+            });
         }
-        ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0);
-        int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(),
-            0L, processTaskRelation.getPostTaskCode());
-
-        if (upstreamCount == 0) {
-            putMsg(result, Status.DATA_IS_NULL, "upstreamCount");
+        if (!taskRelationMap.containsKey(postTaskCode)) {
+            putMsg(result, Status.DATA_IS_NULL, "postTaskCode");
             return result;
         }
-        if (upstreamCount > 1) {
-            int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
-            if (delete == 0) {
-                putMsg(result, Status.DELETE_EDGE_ERROR);
+        if (taskRelationMap.get(postTaskCode).size() > 1) {
+            for (ProcessTaskRelation processTaskRelation : taskRelationMap.get(postTaskCode)) {
+                if (processTaskRelation.getPreTaskCode() == preTaskCode) {
+                    int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
+                    if (delete == 0) {
+                        putMsg(result, Status.DELETE_EDGE_ERROR);
+                        throw new ServiceException(Status.DELETE_EDGE_ERROR);
+                    }
+                    processTaskRelationList.remove(processTaskRelation);
+                }
             }
-            return result;
-        }
-        updateProcessDefiniteVersion(loginUser, result, processDefinition);
-        processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
-        processTaskRelation.setPreTaskVersion(0);
-        processTaskRelation.setPreTaskCode(0L);
-        Date now = new Date();
-        processTaskRelation.setUpdateTime(now);
-        int update = processTaskRelationMapper.updateById(processTaskRelation);
-        processTaskRelation.setId(0);
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-        processTaskRelationLog.setCreateTime(now);
-        processTaskRelationLog.setOperator(loginUser.getId());
-        processTaskRelationLog.setOperateTime(now);
-        int insert = processTaskRelationLogMapper.insert(processTaskRelationLog);
-        if ((update & insert) == 0) {
-            putMsg(result, Status.DELETE_EDGE_ERROR);
-            throw new ServiceException(Status.DELETE_EDGE_ERROR);
+        } else {
+            ProcessTaskRelation processTaskRelation = taskRelationMap.get(postTaskCode).get(0);
+            processTaskRelationList.remove(processTaskRelation);
+            processTaskRelation.setPreTaskVersion(0);
+            processTaskRelation.setPreTaskCode(0L);
+            processTaskRelationList.add(processTaskRelation);
         }
+        updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
 
@@ -627,80 +543,4 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             }
         };
     }
-
-    /**
-     * delete upstream relation
-     *
-     * @param projectCode  project code
-     * @param preTaskCodes pre task codes
-     * @param taskCode     pre task code
-     * @return status
-     */
-    private Status deleteUpstreamRelation(User loginUser, long projectCode, Long[] preTaskCodes, long taskCode) {
-        List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
-        if (CollectionUtils.isEmpty(upstreamList)) {
-            return Status.SUCCESS;
-        }
-        List<ProcessTaskRelationLog> upstreamLogList = new ArrayList<>();
-        Date now = new Date();
-        for (ProcessTaskRelation processTaskRelation : upstreamList) {
-            ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-            processTaskRelationLog.setOperator(loginUser.getId());
-            processTaskRelationLog.setOperateTime(now);
-            processTaskRelationLog.setUpdateTime(now);
-            upstreamLogList.add(processTaskRelationLog);
-        }
-        Map<Long, List<ProcessTaskRelationLog>> processTaskRelationListGroupByProcessDefinitionCode = upstreamLogList.stream()
-            .collect(Collectors.groupingBy(ProcessTaskRelationLog::getProcessDefinitionCode));
-        // count upstream relation group by process definition code
-        List<Map<String, Long>> countListGroupByProcessDefinitionCode = processTaskRelationMapper
-            .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode);
-
-        List<ProcessTaskRelationLog> deletes = new ArrayList<>();
-        List<ProcessTaskRelationLog> updates = new ArrayList<>();
-        for (Map<String, Long> codeCountMap : countListGroupByProcessDefinitionCode) {
-            long processDefinitionCode = codeCountMap.get("processDefinitionCode");
-            long countValue = codeCountMap.get("countValue");
-            ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
-            if (processDefinition == null) {
-                return Status.PROCESS_DEFINE_NOT_EXIST;
-            }
-            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
-            if (insertVersion <= 0) {
-                throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            }
-            List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
-            if (countValue <= processTaskRelationLogList.size()) {
-                ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0);
-                if (processTaskRelationLog.getPreTaskCode() != 0) {
-                    processTaskRelationLog.setPreTaskCode(0);
-                    processTaskRelationLog.setPreTaskVersion(0);
-                }
-                processTaskRelationLog.setProcessDefinitionVersion(insertVersion);
-                updates.add(processTaskRelationLog);
-            }
-            if (!processTaskRelationLogList.isEmpty()) {
-                deletes.addAll(processTaskRelationLogList);
-            }
-        }
-        deletes.addAll(updates);
-        int delete = 0;
-        int deleteLog = 0;
-        for (ProcessTaskRelationLog processTaskRelationLog : deletes) {
-            delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-            deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-        }
-        if ((delete & deleteLog) == 0) {
-            throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-        } else {
-            if (!updates.isEmpty()) {
-                int insert = processTaskRelationMapper.batchInsert(updates);
-                int insertLog = processTaskRelationLogMapper.batchInsert(updates);
-                if ((insert & insertLog) == 0) {
-                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                }
-            }
-        }
-        return Status.SUCCESS;
-    }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 749ebac..2ed0f61 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -201,7 +201,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
             return result;
         }
-        if (taskDefinition.getFlag() == Flag.YES) {
+        if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
             putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
             return result;
         }
@@ -577,7 +577,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                     }
                 }
                 taskDefinition.setFlag(Flag.YES);
-                taskDefinitionLog.setFlag(Flag.NO);
+                taskDefinitionLog.setFlag(Flag.YES);
                 break;
             default:
                 putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
index 5db7868..f21de2a 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -290,48 +290,6 @@ public class ProcessTaskRelationServiceTest {
     }
 
     @Test
-    public void testMoveTaskProcessRelation() {
-        long projectCode = 1L;
-        long processDefinitionCode = 1L;
-        long taskCode = 1L;
-
-        Project project = getProject(projectCode);
-        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
-
-        User loginUser = new User();
-        loginUser.setId(-1);
-        loginUser.setUserType(UserType.GENERAL_USER);
-
-        Map<String, Object> result = new HashMap<>();
-        putMsg(result, Status.SUCCESS, projectCode);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
-        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
-        Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L)).thenReturn(Lists.newArrayList());
-        Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
-        List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
-        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
-        processTaskRelation.setProjectCode(projectCode);
-        processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelation.setPreTaskCode(0L);
-        processTaskRelation.setPreTaskVersion(0);
-        processTaskRelation.setPostTaskCode(taskCode);
-        processTaskRelation.setPostTaskVersion(1);
-        processTaskRelationList.add(processTaskRelation);
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelationLog.setPreTaskCode(0L);
-        processTaskRelationLog.setPreTaskVersion(0);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setPostTaskVersion(1);
-        Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList);
-        Mockito.when(processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation)).thenReturn(processTaskRelationLog);
-        Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
-        Mockito.when(processTaskRelationLogMapper.updateById(processTaskRelationLog)).thenReturn(1);
-        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-    }
-
-    @Test
     public void testQueryDownstreamRelation() {
         long projectCode = 1L;
         long taskCode = 2L;
@@ -479,38 +437,23 @@ public class ProcessTaskRelationServiceTest {
         loginUser.setUserType(UserType.GENERAL_USER);
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
+        List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(projectCode);
+        processTaskRelation.setProcessDefinitionCode(1L);
+        processTaskRelation.setPreTaskCode(0L);
+        processTaskRelation.setPreTaskVersion(0);
+        processTaskRelation.setPostTaskCode(taskCode);
+        processTaskRelation.setPostTaskVersion(1);
+        processTaskRelationList.add(processTaskRelation);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
-        Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(Lists.newArrayList());
-        List<Map<String, Long>> countListGroupByProcessDefinitionCode = new ArrayList<>();
-        countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
-            {
-                put("processDefinitionCode", 123L);
-                put("countValue", 2L);
-            }
-        });
-        countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
-            {
-                put("processDefinitionCode", 124L);
-                put("countValue", 1L);
-            }
-        });
-        countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
-            {
-                put("processDefinitionCode", 125L);
-                put("countValue", 3L);
-            }
-        });
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPreTaskCode(0L);
-        processTaskRelationLog.setPreTaskVersion(0);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setPostTaskVersion(2);
-        Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode);
-        Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        Map<String, Object> result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode);
-        Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
+        Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList);
+        Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition());
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+            1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
+        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 
     @Test
@@ -530,26 +473,24 @@ public class ProcessTaskRelationServiceTest {
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
         Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList());
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPreTaskCode(taskCode);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
         Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
         Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
         TaskDefinition taskDefinition = new TaskDefinition();
         taskDefinition.setTaskType(TaskType.CONDITIONS.getDesc());
         Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
-        Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)).thenReturn(1);
-        processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(projectCode);
-        processTaskRelationLog.setPostTaskCode(taskCode);
-        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
-        processTaskRelationLog.setProcessDefinitionVersion(1);
-        Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
-        Mockito.when(processService.saveProcessDefine(loginUser, getProcessDefinition(), Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
-        result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode);
+        List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(projectCode);
+        processTaskRelation.setProcessDefinitionCode(1L);
+        processTaskRelation.setPreTaskCode(0L);
+        processTaskRelation.setPreTaskVersion(0);
+        processTaskRelation.setPostTaskCode(taskCode);
+        processTaskRelation.setPostTaskVersion(1);
+        processTaskRelationList.add(processTaskRelation);
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)).thenReturn(processTaskRelationList);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+            1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 
@@ -578,15 +519,11 @@ public class ProcessTaskRelationServiceTest {
         processTaskRelationLog.setOperator(loginUser.getId());
         List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
         processTaskRelationList.add(processTaskRelation);
-        Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList);
-        Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1);
-        Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1);
-        Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
-        Mockito.when(processTaskRelationLogMapper.insert(processTaskRelationLog)).thenReturn(1);
-        ProcessDefinition processDefinition = getProcessDefinition();
-        Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
-        Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
-        result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+            1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
     }
 }