You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/11/23 10:45:29 UTC

[GitHub] [dolphinscheduler] JinyLeeChina commented on a change in pull request #6966: to #6957 implement interface deleteUpstreamRelation deleteDownstreamRelation

JinyLeeChina commented on a change in pull request #6966:
URL: https://github.com/apache/dolphinscheduler/pull/6966#discussion_r754971932



##########
File path: dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
##########
@@ -100,4 +101,42 @@ int deleteByCode(@Param("projectCode") long projectCode,
      * @return ProcessTaskRelation
      */
     List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode);
+
+    /**
+     * query task relation by codes
+     *
+     * @param projectCode   projectCode
+     * @param taskCode      taskCode
+     * @param postTaskCodes postTaskCodes list
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> queryDownstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes") Long[] postTaskCodes);
+
+    /**
+     * query task relation by codes
+     *
+     * @param projectCode  projectCode
+     * @param taskCode     taskCode
+     * @param preTaskCodes preTaskCode list
+     * @return ProcessTaskRelation
+     */
+    List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[] preTaskCodes);
+
+
+    /**
+     * count upstream by codes
+     *
+     * @param projectCode projectCode
+     * @param taskCodes    taskCode
+     * @return upstream count list group by process definition code
+     */
+    List<Map<Long,Integer>> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode, @Param("taskCodes")  long taskCodes);

Review comment:
       Please note the naming specification.

##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
##########
@@ -114,29 +116,70 @@
     /**
      * delete task upstream relation
      *
-     * @param loginUser login user
-     * @param projectCode project code
+     * @param loginUser    login user
+     * @param projectCode  project code
      * @param preTaskCodes the pre task codes, sep ','
-     * @param taskCode the post task code
+     * @param taskCode     the post task code
      * @return delete result code
      */
     @Override
     public Map<String, Object> deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes, long taskCode) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        if (org.apache.commons.lang.StringUtils.isEmpty(preTaskCodes)) {

Review comment:
       Please use DS's internal method to judge in controller.

##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
##########
@@ -240,4 +283,59 @@ public int hashCode() {
         };
     }
 
+    /**
+     * delete upstream relation
+     *
+     * @param projectCode  project code
+     * @param preTaskCodes pre task codes
+     * @param taskCode     pre task code
+     * @return status
+     */
+    private Status deleteUpstreamRelation(long projectCode, Long[] preTaskCodes, long taskCode) {
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
+        if (CollectionUtils.isEmpty(processTaskRelationList)) {
+            return Status.SUCCESS;
+        }
+        Map<Long, List<ProcessTaskRelation>> processTaskRelationListGroupByProcessDefinitionCode = processTaskRelationList.stream()
+                .collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
+        // count upstream relation group by process definition code
+        List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, taskCode);
+
+        List<Integer> deletes = new ArrayList<>();
+        List<ProcessTaskRelation> updates = new ArrayList<>();
+
+        countListGroupByProcessDefinitionCode.stream().forEach(
+                processDefinitionCodeUpstreamCountMap ->
+                        processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach(
+                                o -> {
+                                    Long processDefinitionCode = o.getKey();
+                                    Integer count = o.getValue();
+                                    List<ProcessTaskRelation> processTaskRelationList1 = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
+                                    if (count <= processTaskRelationList1.size()) {
+                                        ProcessTaskRelation processTaskRelation = processTaskRelationList1.remove(0);
+                                        if (processTaskRelation.getPreTaskCode() != 0) {
+                                            processTaskRelation.setPreTaskCode(0);
+                                            updates.add(processTaskRelation);

Review comment:
       Please set version=0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org