You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/11/23 10:45:29 UTC
[GitHub] [dolphinscheduler] JinyLeeChina commented on a change in pull request #6966: to #6957 implement interface deleteUpstreamRelation deleteDownstreamRelation
JinyLeeChina commented on a change in pull request #6966:
URL: https://github.com/apache/dolphinscheduler/pull/6966#discussion_r754971932
##########
File path: dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
##########
@@ -100,4 +101,42 @@ int deleteByCode(@Param("projectCode") long projectCode,
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode);
+
+ /**
+ * query task relation by codes
+ *
+ * @param projectCode projectCode
+ * @param taskCode taskCode
+ * @param postTaskCodes postTaskCodes list
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation> queryDownstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes") Long[] postTaskCodes);
+
+ /**
+ * query task relation by codes
+ *
+ * @param projectCode projectCode
+ * @param taskCode taskCode
+ * @param preTaskCodes preTaskCode list
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[] preTaskCodes);
+
+
+ /**
+ * count upstream by codes
+ *
+ * @param projectCode projectCode
+ * @param taskCodes taskCode
+ * @return upstream count list group by process definition code
+ */
+ List<Map<Long,Integer>> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode, @Param("taskCodes") long taskCodes);
Review comment:
Please note the naming specification.
##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
##########
@@ -114,29 +116,70 @@
/**
* delete task upstream relation
*
- * @param loginUser login user
- * @param projectCode project code
+ * @param loginUser login user
+ * @param projectCode project code
* @param preTaskCodes the pre task codes, sep ','
- * @param taskCode the post task code
+ * @param taskCode the post task code
* @return delete result code
*/
@Override
public Map<String, Object> deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes, long taskCode) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ if (org.apache.commons.lang.StringUtils.isEmpty(preTaskCodes)) {
Review comment:
Please use DS's internal method to judge in controller.
##########
File path: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
##########
@@ -240,4 +283,59 @@ public int hashCode() {
};
}
+ /**
+ * delete upstream relation
+ *
+ * @param projectCode project code
+ * @param preTaskCodes pre task codes
+ * @param taskCode pre task code
+ * @return status
+ */
+ private Status deleteUpstreamRelation(long projectCode, Long[] preTaskCodes, long taskCode) {
+ List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
+ if (CollectionUtils.isEmpty(processTaskRelationList)) {
+ return Status.SUCCESS;
+ }
+ Map<Long, List<ProcessTaskRelation>> processTaskRelationListGroupByProcessDefinitionCode = processTaskRelationList.stream()
+ .collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
+ // count upstream relation group by process definition code
+ List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, taskCode);
+
+ List<Integer> deletes = new ArrayList<>();
+ List<ProcessTaskRelation> updates = new ArrayList<>();
+
+ countListGroupByProcessDefinitionCode.stream().forEach(
+ processDefinitionCodeUpstreamCountMap ->
+ processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach(
+ o -> {
+ Long processDefinitionCode = o.getKey();
+ Integer count = o.getValue();
+ List<ProcessTaskRelation> processTaskRelationList1 = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
+ if (count <= processTaskRelationList1.size()) {
+ ProcessTaskRelation processTaskRelation = processTaskRelationList1.remove(0);
+ if (processTaskRelation.getPreTaskCode() != 0) {
+ processTaskRelation.setPreTaskCode(0);
+ updates.add(processTaskRelation);
Review comment:
Please set version=0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org