You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/25 12:17:39 UTC
[dolphinscheduler] branch 2.0.4-prepare updated: fix relation delete (#8190)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
new a24a816 fix relation delete (#8190)
a24a816 is described below
commit a24a81685aaddbf86ad6388e9f2e31b5ce404b14
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Tue Jan 25 20:17:33 2022 +0800
fix relation delete (#8190)
---
.../controller/ProcessTaskRelationController.java | 44 +--
.../api/service/ProcessTaskRelationService.java | 16 -
.../impl/ProcessTaskRelationServiceImpl.java | 382 ++++++---------------
.../service/impl/TaskDefinitionServiceImpl.java | 4 +-
.../service/ProcessTaskRelationServiceTest.java | 131 ++-----
5 files changed, 149 insertions(+), 428 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
index 54887b3..710c8ac 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
@@ -102,46 +102,6 @@ public class ProcessTaskRelationController extends BaseController {
}
/**
- * move task to other processDefinition
- *
- * @param loginUser login user info
- * @param projectCode project code
- * @param processDefinitionCode process definition code
- * @param targetProcessDefinitionCode target process definition code
- * @param taskCode the current task code (the post task code)
- * @return move result code
- */
- @ApiOperation(value = "moveRelation", notes = "MOVE_TASK_TO_OTHER_PROCESS_DEFINITION_NOTES")
- @ApiImplicitParams({
- @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
- @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "Long"),
- @ApiImplicitParam(name = "targetProcessDefinitionCode", value = "TARGET_PROCESS_DEFINITION_CODE", required = true, type = "Long"),
- @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
- })
- @PostMapping(value = "/move")
- @ResponseStatus(HttpStatus.OK)
- @ApiException(MOVE_PROCESS_TASK_RELATION_ERROR)
- @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result moveTaskProcessRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
- @RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode,
- @RequestParam(name = "targetProcessDefinitionCode", required = true) long targetProcessDefinitionCode,
- @RequestParam(name = "taskCode", required = true) long taskCode) {
- Map<String, Object> result = new HashMap<>();
- if (processDefinitionCode == 0L) {
- putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode");
- } else if (targetProcessDefinitionCode == 0L) {
- putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
- } else if (taskCode == 0L) {
- putMsg(result, DATA_IS_NOT_VALID, "taskCode");
- } else {
- result = processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode,
- targetProcessDefinitionCode, taskCode);
- }
- return returnDataList(result);
- }
-
- /**
* delete process task relation (delete task from workflow)
*
* @param loginUser login user
@@ -179,7 +139,7 @@ public class ProcessTaskRelationController extends BaseController {
@ApiOperation(value = "deleteUpstreamRelation", notes = "DELETE_UPSTREAM_RELATION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
- @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "3,4"),
+ @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "1,2"),
@ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
})
@DeleteMapping(value = "/{taskCode}/upstream")
@@ -205,7 +165,7 @@ public class ProcessTaskRelationController extends BaseController {
@ApiOperation(value = "deleteDownstreamRelation", notes = "DELETE_DOWNSTREAM_RELATION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
- @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "3,4"),
+ @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "1,2"),
@ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long")
})
@DeleteMapping(value = "/{taskCode}/downstream")
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
index 4246823..f31cc8b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
@@ -43,22 +43,6 @@ public interface ProcessTaskRelationService {
long postTaskCode);
/**
- * move task to other processDefinition
- *
- * @param loginUser login user info
- * @param projectCode project code
- * @param processDefinitionCode process definition code
- * @param targetProcessDefinitionCode target process definition code
- * @param taskCode the current task code (the post task code)
- * @return move result code
- */
- Map<String, Object> moveTaskProcessRelation(User loginUser,
- long projectCode,
- long processDefinitionCode,
- long targetProcessDefinitionCode,
- long taskCode);
-
- /**
* delete process task relation
*
* @param loginUser login user
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index a53bbe7..a864c61 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.api.service.impl;
-import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
-
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
@@ -26,7 +24,6 @@ import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@@ -47,7 +44,7 @@ import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -58,8 +55,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
/**
@@ -191,111 +186,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return processTaskRelationLog;
}
- /**
- * move task to other processDefinition
- *
- * @param loginUser login user info
- * @param projectCode project code
- * @param processDefinitionCode process definition code
- * @param targetProcessDefinitionCode target process definition code
- * @param taskCode the current task code (the post task code)
- * @return move result code
- */
- @Transactional(rollbackFor = RuntimeException.class)
- @Override
- public Map<String, Object> moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) {
- Project project = projectMapper.queryByCode(projectCode);
- //check user access for project
- Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
- ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode);
- if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode);
- return result;
- }
- if (processDefinition.getProjectCode() != projectCode) {
- putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
- return result;
- }
- List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
- if (CollectionUtils.isNotEmpty(downstreamList)) {
- Set<Long> postTaskCodes = downstreamList
- .stream()
- .map(ProcessTaskRelation::getPostTaskCode)
- .collect(Collectors.toSet());
- putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
- return result;
- }
- List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode);
- if (upstreamList.isEmpty()) {
- putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode);
- return result;
- } else {
- Set<Long> preTaskCodes = upstreamList
- .stream()
- .map(ProcessTaskRelation::getPreTaskCode)
- .collect(Collectors.toSet());
- if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) {
- putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ","));
- return result;
- }
- }
- TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
- if (null == taskDefinition) {
- putMsg(result, Status.DATA_IS_NULL, "taskDefinition");
- return result;
- }
- ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams());
- if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) {
- Set<Long> depProcessDefinitionCodes = new HashSet<>();
- ObjectNode dependence = (ObjectNode) paramNode.get("dependence");
- ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
- for (int i = 0; i < dependTaskList.size(); i++) {
- ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
- ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
- for (int j = 0; j < dependItemList.size(); j++) {
- ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
- long definitionCode = dependItem.get("definitionCode").asLong();
- depProcessDefinitionCodes.add(definitionCode);
- }
- }
- if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) {
- putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
- return result;
- }
- }
- if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
- long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong();
- if (targetProcessDefinitionCode == subProcessDefinitionCode) {
- putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
- return result;
- }
- }
- updateProcessDefiniteVersion(loginUser, result, processDefinition);
- Date now = new Date();
- ProcessTaskRelation processTaskRelation = upstreamList.get(0);
- ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation);
- processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
- processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
- processTaskRelation.setUpdateTime(now);
- processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
- processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
- processTaskRelationLog.setUpdateTime(now);
- processTaskRelationLog.setOperator(loginUser.getId());
- processTaskRelationLog.setOperateTime(now);
- int update = processTaskRelationMapper.updateById(processTaskRelation);
- int updateLog = processTaskRelationLogMapper.updateById(processTaskRelationLog);
- if (update == 0 || updateLog == 0) {
- putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR);
- throw new ServiceException(Status.MOVE_PROCESS_TASK_RELATION_ERROR);
- } else {
- putMsg(result, Status.SUCCESS);
- }
- return result;
- }
-
private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
@@ -337,28 +227,25 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
- List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
- if (CollectionUtils.isNotEmpty(downstreamList)) {
- Set<Long> postTaskCodes = downstreamList
- .stream()
- .map(ProcessTaskRelation::getPostTaskCode)
- .collect(Collectors.toSet());
- putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
+ List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+ if (CollectionUtils.isEmpty(processTaskRelationList)) {
+ putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
}
- updateProcessDefiniteVersion(loginUser, result, processDefinition);
- ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
- processTaskRelationLog.setProjectCode(projectCode);
- processTaskRelationLog.setPostTaskCode(taskCode);
- processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
- processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
- processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
- int deleteRelation = processTaskRelationMapper.deleteRelation(processTaskRelationLog);
- int deleteRelationLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
- if (0 == deleteRelation || 0 == deleteRelationLog) {
- putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
- throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+ List<Long> downstreamList = Lists.newArrayList();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+ if (processTaskRelation.getPreTaskCode() == taskCode) {
+ downstreamList.add(processTaskRelation.getPostTaskCode());
+ }
+ if (processTaskRelation.getPostTaskCode() == taskCode) {
+ processTaskRelationList.remove(processTaskRelation);
+ }
+ }
+ if (CollectionUtils.isNotEmpty(downstreamList)) {
+ putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ","));
+ return result;
}
+ updateRelation(loginUser, result, processDefinition, processTaskRelationList);
if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
|| TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
|| TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
@@ -372,6 +259,21 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
+ private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
+ List<ProcessTaskRelation> processTaskRelationList) {
+ updateProcessDefiniteVersion(loginUser, result, processDefinition);
+ List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+ int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
+ processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
+ if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, processDefinition);
+ } else {
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ }
+ }
+
/**
* delete task upstream relation
*
@@ -394,11 +296,42 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
- Status status = deleteUpstreamRelation(loginUser, projectCode,
- Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode);
- if (status != Status.SUCCESS) {
- putMsg(result, status);
+ List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ if (CollectionUtils.isEmpty(upstreamList)) {
+ putMsg(result, Status.DATA_IS_NULL, "taskCode");
+ return result;
+ }
+
+ List<Long> preTaskCodeList = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList());
+ if (preTaskCodeList.contains(0L)) {
+ putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
+ return result;
+ }
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
+ if (processDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
+ return result;
}
+ List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
+ List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+ if (preTaskCodeList.size() > 1) {
+ if (preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
+ preTaskCodeList.remove(processTaskRelation.getPreTaskCode());
+ processTaskRelationWaitRemove.add(processTaskRelation);
+ }
+ } else {
+ if (processTaskRelation.getPostTaskCode() == taskCode) {
+ processTaskRelation.setPreTaskVersion(0);
+ processTaskRelation.setPreTaskCode(0L);
+ }
+ }
+ if (preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) {
+ processTaskRelationWaitRemove.add(processTaskRelation);
+ }
+ }
+ processTaskRelationList.removeAll(processTaskRelationWaitRemove);
+ updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result;
}
@@ -424,38 +357,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
return result;
}
- List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
- Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
- processTaskRelationList.stream()
- .map(ProcessTaskRelationLog::new)
- .collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog));
- Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
- int delete = 0;
- int deleteLog = 0;
- Set<Long> processCodeSet = new HashSet<>();
- for (long postTaskCode : postTaskCodesSet) {
- ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode);
- if (processTaskRelationLog != null) {
- delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
- deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
- processCodeSet.add(processTaskRelationLog.getProcessDefinitionCode());
- }
+ List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
+ if (CollectionUtils.isEmpty(downstreamList)) {
+ putMsg(result, Status.DATA_IS_NULL, "taskCode");
+ return result;
}
- for (long code : processCodeSet) {
- ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
- if (processDefinition == null) {
- throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
- }
- int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
- if (insertVersion <= 0) {
- throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
+ List<Long> postTaskCodeList = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList());
+ if (postTaskCodeList.contains(0L)) {
+ putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
+ return result;
}
- if ((delete & deleteLog) == 0) {
- throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
- } else {
- putMsg(result, Status.SUCCESS);
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode());
+ if (processDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode());
+ return result;
}
+ List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
+ processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode);
+ updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result;
}
@@ -555,47 +474,44 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
- List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode);
+ List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
}
- if (processTaskRelationList.size() > 1) {
- putMsg(result, Status.DATA_IS_NOT_VALID, "processTaskRelationList");
- return result;
+ Map<Long, List<ProcessTaskRelation>> taskRelationMap = new HashMap<>();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+ taskRelationMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
+ if (v == null) {
+ v = new ArrayList<>();
+ }
+ v.add(processTaskRelation);
+ return v;
+ });
}
- ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0);
- int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(),
- 0L, processTaskRelation.getPostTaskCode());
-
- if (upstreamCount == 0) {
- putMsg(result, Status.DATA_IS_NULL, "upstreamCount");
+ if (!taskRelationMap.containsKey(postTaskCode)) {
+ putMsg(result, Status.DATA_IS_NULL, "postTaskCode");
return result;
}
- if (upstreamCount > 1) {
- int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
- if (delete == 0) {
- putMsg(result, Status.DELETE_EDGE_ERROR);
+ if (taskRelationMap.get(postTaskCode).size() > 1) {
+ for (ProcessTaskRelation processTaskRelation : taskRelationMap.get(postTaskCode)) {
+ if (processTaskRelation.getPreTaskCode() == preTaskCode) {
+ int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
+ if (delete == 0) {
+ putMsg(result, Status.DELETE_EDGE_ERROR);
+ throw new ServiceException(Status.DELETE_EDGE_ERROR);
+ }
+ processTaskRelationList.remove(processTaskRelation);
+ }
}
- return result;
- }
- updateProcessDefiniteVersion(loginUser, result, processDefinition);
- processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
- processTaskRelation.setPreTaskVersion(0);
- processTaskRelation.setPreTaskCode(0L);
- Date now = new Date();
- processTaskRelation.setUpdateTime(now);
- int update = processTaskRelationMapper.updateById(processTaskRelation);
- processTaskRelation.setId(0);
- ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
- processTaskRelationLog.setCreateTime(now);
- processTaskRelationLog.setOperator(loginUser.getId());
- processTaskRelationLog.setOperateTime(now);
- int insert = processTaskRelationLogMapper.insert(processTaskRelationLog);
- if ((update & insert) == 0) {
- putMsg(result, Status.DELETE_EDGE_ERROR);
- throw new ServiceException(Status.DELETE_EDGE_ERROR);
+ } else {
+ ProcessTaskRelation processTaskRelation = taskRelationMap.get(postTaskCode).get(0);
+ processTaskRelationList.remove(processTaskRelation);
+ processTaskRelation.setPreTaskVersion(0);
+ processTaskRelation.setPreTaskCode(0L);
+ processTaskRelationList.add(processTaskRelation);
}
+ updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result;
}
@@ -627,80 +543,4 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
};
}
-
- /**
- * delete upstream relation
- *
- * @param projectCode project code
- * @param preTaskCodes pre task codes
- * @param taskCode pre task code
- * @return status
- */
- private Status deleteUpstreamRelation(User loginUser, long projectCode, Long[] preTaskCodes, long taskCode) {
- List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
- if (CollectionUtils.isEmpty(upstreamList)) {
- return Status.SUCCESS;
- }
- List<ProcessTaskRelationLog> upstreamLogList = new ArrayList<>();
- Date now = new Date();
- for (ProcessTaskRelation processTaskRelation : upstreamList) {
- ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
- processTaskRelationLog.setOperator(loginUser.getId());
- processTaskRelationLog.setOperateTime(now);
- processTaskRelationLog.setUpdateTime(now);
- upstreamLogList.add(processTaskRelationLog);
- }
- Map<Long, List<ProcessTaskRelationLog>> processTaskRelationListGroupByProcessDefinitionCode = upstreamLogList.stream()
- .collect(Collectors.groupingBy(ProcessTaskRelationLog::getProcessDefinitionCode));
- // count upstream relation group by process definition code
- List<Map<String, Long>> countListGroupByProcessDefinitionCode = processTaskRelationMapper
- .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode);
-
- List<ProcessTaskRelationLog> deletes = new ArrayList<>();
- List<ProcessTaskRelationLog> updates = new ArrayList<>();
- for (Map<String, Long> codeCountMap : countListGroupByProcessDefinitionCode) {
- long processDefinitionCode = codeCountMap.get("processDefinitionCode");
- long countValue = codeCountMap.get("countValue");
- ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
- if (processDefinition == null) {
- return Status.PROCESS_DEFINE_NOT_EXIST;
- }
- int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
- if (insertVersion <= 0) {
- throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
- List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
- if (countValue <= processTaskRelationLogList.size()) {
- ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0);
- if (processTaskRelationLog.getPreTaskCode() != 0) {
- processTaskRelationLog.setPreTaskCode(0);
- processTaskRelationLog.setPreTaskVersion(0);
- }
- processTaskRelationLog.setProcessDefinitionVersion(insertVersion);
- updates.add(processTaskRelationLog);
- }
- if (!processTaskRelationLogList.isEmpty()) {
- deletes.addAll(processTaskRelationLogList);
- }
- }
- deletes.addAll(updates);
- int delete = 0;
- int deleteLog = 0;
- for (ProcessTaskRelationLog processTaskRelationLog : deletes) {
- delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
- deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
- }
- if ((delete & deleteLog) == 0) {
- throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
- } else {
- if (!updates.isEmpty()) {
- int insert = processTaskRelationMapper.batchInsert(updates);
- int insertLog = processTaskRelationLogMapper.batchInsert(updates);
- if ((insert & insertLog) == 0) {
- throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- }
- }
- }
- return Status.SUCCESS;
- }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 749ebac..2ed0f61 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -201,7 +201,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
- if (taskDefinition.getFlag() == Flag.YES) {
+ if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
}
@@ -577,7 +577,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
taskDefinition.setFlag(Flag.YES);
- taskDefinitionLog.setFlag(Flag.NO);
+ taskDefinitionLog.setFlag(Flag.YES);
break;
default:
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
index 5db7868..f21de2a 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
@@ -290,48 +290,6 @@ public class ProcessTaskRelationServiceTest {
}
@Test
- public void testMoveTaskProcessRelation() {
- long projectCode = 1L;
- long processDefinitionCode = 1L;
- long taskCode = 1L;
-
- Project project = getProject(projectCode);
- Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
-
- User loginUser = new User();
- loginUser.setId(-1);
- loginUser.setUserType(UserType.GENERAL_USER);
-
- Map<String, Object> result = new HashMap<>();
- putMsg(result, Status.SUCCESS, projectCode);
- Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
- Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
- Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L)).thenReturn(Lists.newArrayList());
- Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
- List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
- ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
- processTaskRelation.setProjectCode(projectCode);
- processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
- processTaskRelation.setPreTaskCode(0L);
- processTaskRelation.setPreTaskVersion(0);
- processTaskRelation.setPostTaskCode(taskCode);
- processTaskRelation.setPostTaskVersion(1);
- processTaskRelationList.add(processTaskRelation);
- ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
- processTaskRelationLog.setProjectCode(projectCode);
- processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
- processTaskRelationLog.setPreTaskCode(0L);
- processTaskRelationLog.setPreTaskVersion(0);
- processTaskRelationLog.setPostTaskCode(taskCode);
- processTaskRelationLog.setPostTaskVersion(1);
- Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList);
- Mockito.when(processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation)).thenReturn(processTaskRelationLog);
- Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
- Mockito.when(processTaskRelationLogMapper.updateById(processTaskRelationLog)).thenReturn(1);
- Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- }
-
- @Test
public void testQueryDownstreamRelation() {
long projectCode = 1L;
long taskCode = 2L;
@@ -479,38 +437,23 @@ public class ProcessTaskRelationServiceTest {
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
+ List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setProjectCode(projectCode);
+ processTaskRelation.setProcessDefinitionCode(1L);
+ processTaskRelation.setPreTaskCode(0L);
+ processTaskRelation.setPreTaskVersion(0);
+ processTaskRelation.setPostTaskCode(taskCode);
+ processTaskRelation.setPostTaskVersion(1);
+ processTaskRelationList.add(processTaskRelation);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
- Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(Lists.newArrayList());
- List<Map<String, Long>> countListGroupByProcessDefinitionCode = new ArrayList<>();
- countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
- {
- put("processDefinitionCode", 123L);
- put("countValue", 2L);
- }
- });
- countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
- {
- put("processDefinitionCode", 124L);
- put("countValue", 1L);
- }
- });
- countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
- {
- put("processDefinitionCode", 125L);
- put("countValue", 3L);
- }
- });
- ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
- processTaskRelationLog.setProjectCode(projectCode);
- processTaskRelationLog.setPreTaskCode(0L);
- processTaskRelationLog.setPreTaskVersion(0);
- processTaskRelationLog.setPostTaskCode(taskCode);
- processTaskRelationLog.setPostTaskVersion(2);
- Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode);
- Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
- Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
- Map<String, Object> result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode);
- Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
+ Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList);
+ Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition());
+ Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
+ List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+ Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+ 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
+ Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
@@ -530,26 +473,24 @@ public class ProcessTaskRelationServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList());
- ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
- processTaskRelationLog.setProjectCode(projectCode);
- processTaskRelationLog.setPreTaskCode(taskCode);
- processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
- Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTaskType(TaskType.CONDITIONS.getDesc());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
- Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)).thenReturn(1);
- processTaskRelationLog = new ProcessTaskRelationLog();
- processTaskRelationLog.setProjectCode(projectCode);
- processTaskRelationLog.setPostTaskCode(taskCode);
- processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
- processTaskRelationLog.setProcessDefinitionVersion(1);
- Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
- Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
- Mockito.when(processService.saveProcessDefine(loginUser, getProcessDefinition(), Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
- result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode);
+ List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setProjectCode(projectCode);
+ processTaskRelation.setProcessDefinitionCode(1L);
+ processTaskRelation.setPreTaskCode(0L);
+ processTaskRelation.setPreTaskVersion(0);
+ processTaskRelation.setPostTaskCode(taskCode);
+ processTaskRelation.setPostTaskVersion(1);
+ processTaskRelationList.add(processTaskRelation);
+ Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)).thenReturn(processTaskRelationList);
+ List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+ Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+ 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@@ -578,15 +519,11 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationLog.setOperator(loginUser.getId());
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation);
- Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList);
- Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1);
- Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1);
- Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
- Mockito.when(processTaskRelationLogMapper.insert(processTaskRelationLog)).thenReturn(1);
- ProcessDefinition processDefinition = getProcessDefinition();
- Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
- Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
- result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
+ Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
+ List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+ Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L,
+ 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
}