You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/02/07 02:31:18 UTC

[dolphinscheduler] branch 2.0.4-prepare updated: fix task api update bug (#8270)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
     new d5d7184  fix task api update bug (#8270)
d5d7184 is described below

commit d5d71842e9e8408ce1a38861f651e93130dfae4e
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Mon Feb 7 10:31:08 2022 +0800

    fix task api update bug (#8270)
---
 .../impl/ProcessTaskRelationServiceImpl.java       | 100 +++++++++------------
 .../service/impl/TaskDefinitionServiceImpl.java    |  93 ++++++++-----------
 2 files changed, 80 insertions(+), 113 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index a864c61..a9b8291 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -115,75 +115,58 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
             return result;
         }
-        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, postTaskCode);
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
+        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
         if (!processTaskRelations.isEmpty()) {
-            Map<Long, ProcessTaskRelation> preTaskCodeMap = processTaskRelations.stream()
+            Map<Long, ProcessTaskRelation> preTaskCodeMap = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode)
                 .collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation));
-            if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
-                putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
-                return result;
-            }
-            if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
-                ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(preTaskCodeMap.get(0L));
-                // delete no upstream
-                int delete = processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                int deleteLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
-                if ((delete & deleteLog) == 0) {
-                    putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            if (!preTaskCodeMap.isEmpty()) {
+                if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
+                    putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
+                    return result;
+                }
+                if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
+                    // delete no upstream
+                    processTaskRelations.remove(preTaskCodeMap.get(0L));
                 }
             }
         }
-        updateProcessDefiniteVersion(loginUser, result, processDefinition);
-        Date now = new Date();
-        List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
+        TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
+        ProcessTaskRelation processTaskRelation = setRelation(processDefinition, postTaskDefinition);
         if (preTaskCode != 0L) {
-            // upstream is or not exist
-            List<ProcessTaskRelation> upstreamProcessTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, preTaskCode);
             TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode);
-            if (upstreamProcessTaskRelations.isEmpty()) {
-                ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition);
-                processTaskRelationLog.setPreTaskCode(0L);
-                processTaskRelationLog.setPreTaskVersion(0);
-                processTaskRelationLogs.add(processTaskRelationLog);
+            List<ProcessTaskRelation> upstreamTaskRelationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == preTaskCode).collect(Collectors.toList());
+            // upstream is or not exist
+            if (upstreamTaskRelationList.isEmpty()) {
+                ProcessTaskRelation preProcessTaskRelation = setRelation(processDefinition, preTaskDefinition);
+                preProcessTaskRelation.setPreTaskCode(0L);
+                preProcessTaskRelation.setPreTaskVersion(0);
+                processTaskRelations.add(preProcessTaskRelation);
             }
-            TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
-            ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
-            processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode());
-            processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion());
-            processTaskRelationLogs.add(processTaskRelationLog);
-        } else {
-            TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
-            ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
-            processTaskRelationLog.setPreTaskCode(0L);
-            processTaskRelationLog.setPreTaskVersion(0);
-            processTaskRelationLogs.add(processTaskRelationLog);
-        }
-        int insert = processTaskRelationMapper.batchInsert(processTaskRelationLogs);
-        int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
-        if ((insert & insertLog) > 0) {
-            putMsg(result, Status.SUCCESS);
+            processTaskRelation.setPreTaskCode(preTaskDefinition.getCode());
+            processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion());
         } else {
-            putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-            throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            processTaskRelation.setPreTaskCode(0L);
+            processTaskRelation.setPreTaskVersion(0);
         }
+        processTaskRelations.add(processTaskRelation);
+        updateRelation(loginUser, result, processDefinition, processTaskRelations);
         return result;
     }
 
-    private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinition, Date now, int userId, TaskDefinition taskDefinition) {
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
-        processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
-        processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
-        processTaskRelationLog.setPostTaskCode(taskDefinition.getCode());
-        processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
-        processTaskRelationLog.setConditionType(ConditionType.NONE);
-        processTaskRelationLog.setConditionParams("{}");
-        processTaskRelationLog.setCreateTime(now);
-        processTaskRelationLog.setUpdateTime(now);
-        processTaskRelationLog.setOperator(userId);
-        processTaskRelationLog.setOperateTime(now);
-        return processTaskRelationLog;
+    private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) {
+        Date now = new Date();
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(processDefinition.getProjectCode());
+        processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
+        processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
+        processTaskRelation.setPostTaskCode(taskDefinition.getCode());
+        processTaskRelation.setPostTaskVersion(taskDefinition.getVersion());
+        processTaskRelation.setConditionType(ConditionType.NONE);
+        processTaskRelation.setConditionParams("{}");
+        processTaskRelation.setCreateTime(now);
+        processTaskRelation.setUpdateTime(now);
+        return processTaskRelation;
     }
 
     private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
@@ -245,6 +228,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ","));
             return result;
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
             || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
@@ -261,7 +245,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
 
     private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
                                 List<ProcessTaskRelation> processTaskRelationList) {
-        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
         int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
             processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
@@ -331,6 +314,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             }
         }
         processTaskRelationList.removeAll(processTaskRelationWaitRemove);
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
@@ -374,6 +358,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
         }
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
         processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode);
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
@@ -511,6 +496,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
             processTaskRelation.setPreTaskCode(0L);
             processTaskRelationList.add(processTaskRelation);
         }
+        updateProcessDefiniteVersion(loginUser, result, processDefinition);
         updateRelation(loginUser, result, processDefinition, processTaskRelationList);
         return result;
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 2ed0f61..199670d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -28,9 +28,9 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -67,6 +66,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.google.common.collect.Lists;
 
 /**
  * task definition service impl
@@ -94,9 +94,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
     @Autowired
-    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
-
-    @Autowired
     private ProcessDefinitionMapper processDefinitionMapper;
 
     @Autowired
@@ -218,17 +215,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         if (delete > 0) {
             List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
             if (!taskRelationList.isEmpty()) {
-                int deleteRelation = 0;
-                for (ProcessTaskRelation processTaskRelation : taskRelationList) {
-                    deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId());
-                }
-                if (deleteRelation == 0) {
-                    throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-                }
                 long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
-                updateProcessDefiniteVersion(loginUser, processDefinitionCode);
+                List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+                List<ProcessTaskRelation> relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList());
+                updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList());
+            } else {
+                putMsg(result, Status.SUCCESS);
             }
-            putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
             throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
@@ -236,7 +229,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         return result;
     }
 
-    private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) {
+    private void updateDag(User loginUser, Map<String, Object> result, long processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
+                                List<TaskDefinitionLog> taskDefinitionLogs) {
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
         if (processDefinition == null) {
             throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
@@ -245,7 +239,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         if (insertVersion <= 0) {
             throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
         }
-        return insertVersion;
+        List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
+            insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
+        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+            putMsg(result, Status.SUCCESS);
+            result.put(Constants.DATA_LIST, processDefinition);
+        } else {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
     }
 
     /**
@@ -308,54 +311,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
-        handleRelation(loginUser, taskCode, version, now);
+        List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+        if (!taskRelationList.isEmpty()) {
+            long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
+            List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+            updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
+        } else {
+            putMsg(result, Status.SUCCESS);
+        }
         result.put(Constants.DATA_LIST, taskCode);
         putMsg(result, Status.SUCCESS, update);
         return result;
     }
 
-    private void handleRelation(User loginUser, long taskCode, Integer version, Date now) {
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
-        if (!processTaskRelationList.isEmpty()) {
-            long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
-            int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
-            List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
-            int delete = 0;
-            for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
-                ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-                delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
-                if (processTaskRelationLog.getPreTaskCode() == taskCode) {
-                    processTaskRelationLog.setPreTaskVersion(version);
-                }
-                if (processTaskRelationLog.getPostTaskCode() == taskCode) {
-                    processTaskRelationLog.setPostTaskVersion(version);
-                }
-                processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
-                processTaskRelationLog.setOperator(loginUser.getId());
-                processTaskRelationLog.setOperateTime(now);
-                processTaskRelationLog.setUpdateTime(now);
-                processTaskRelationLogList.add(processTaskRelationLog);
-            }
-            if (delete == 0) {
-                throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-            } else {
-                int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
-                int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
-                if ((insertRelation & insertRelationLog) == 0) {
-                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-                }
-            }
-        }
-    }
-
     /**
-     * update task definition
+     * switch task definition
      *
      * @param loginUser login user
      * @param projectCode project code
      * @param taskCode task code
      * @param version the version user want to switch
      */
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> switchVersion(User loginUser, long projectCode, long taskCode, int version) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -375,14 +352,18 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         }
         TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
         taskDefinitionUpdate.setUserId(loginUser.getId());
-        Date now = new Date();
         taskDefinitionUpdate.setUpdateTime(new Date());
         taskDefinitionUpdate.setId(taskDefinition.getId());
         int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate);
         if (switchVersion > 0) {
-            handleRelation(loginUser, taskCode, version, now);
-            result.put(Constants.DATA_LIST, taskCode);
-            putMsg(result, Status.SUCCESS);
+            List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+            if (!taskRelationList.isEmpty()) {
+                long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
+                List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+                updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
+            } else {
+                putMsg(result, Status.SUCCESS);
+            }
         } else {
             putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR);
         }