You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/14 06:58:03 UTC
[dolphinscheduler] branch dev updated: [Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ca5af013ae [Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094)
ca5af013ae is described below
commit ca5af013ae6a2c9272ece92264060cf5ebbf80f2
Author: insist777 <84...@users.noreply.github.com>
AuthorDate: Wed Dec 14 14:57:57 2022 +0800
[Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094)
Co-authored-by: xiangzihao <46...@qq.com>
---
.../ProcessTaskRelationV2Controller.java | 3 +-
.../api/dto/workflow/WorkflowUpdateRequest.java | 2 -
.../apache/dolphinscheduler/api/enums/Status.java | 1 +
.../api/service/ProcessTaskRelationService.java | 8 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 107 +++++++++++++++-
.../impl/ProcessTaskRelationServiceImpl.java | 135 +++++++++++++++++++--
.../service/impl/TaskDefinitionServiceImpl.java | 15 +--
.../api/service/ProcessDefinitionServiceTest.java | 14 ++-
.../api/service/TaskDefinitionServiceImplTest.java | 108 ++++++++++++++++-
9 files changed, 360 insertions(+), 33 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
index 8430651220..b6798e39bf 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
@@ -125,7 +125,8 @@ public class ProcessTaskRelationV2Controller extends BaseController {
@PathVariable("code") Long code,
@RequestBody TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationService
- .updateUpstreamTaskDefinition(loginUser, code, taskRelationUpdateUpstreamRequest);
+ .updateUpstreamTaskDefinitionWithSyncDag(loginUser, code, Boolean.TRUE,
+ taskRelationUpdateUpstreamRequest);
return Result.success(processTaskRelations);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
index 7e3d90d46a..c402285eff 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
@@ -103,8 +103,6 @@ public class WorkflowUpdateRequest {
processDefinitionDeepCopy.setLocations(this.location);
}
- int version = processDefinitionDeepCopy.getVersion() + 1;
- processDefinitionDeepCopy.setVersion(version);
processDefinitionDeepCopy.setUpdateTime(new Date());
return processDefinitionDeepCopy;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index d0f82b3e99..0f1cb95b47 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -381,6 +381,7 @@ public enum Status {
"批量创建工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
"批量修改工作流任务关系错误"),
+ UPSTREAM_TASK_NOT_EXISTS(50071, "upstream task want to set dependence do not exists {0}", "指定的上游任务 {0} 不存在"),
WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation",
"工作流实例未结束,不能执行此操作"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
index 7a31d49916..f5a1f7a082 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
@@ -86,11 +86,13 @@ public interface ProcessTaskRelationService {
*
* @param loginUser login user
* @param taskCode relation upstream code
+ * @param needSyncDag needSyncDag
* @param taskRelationUpdateUpstreamRequest relation downstream code
*/
- List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
- long taskCode,
- TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest);
+ List<ProcessTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
+ long taskCode,
+ Boolean needSyncDag,
+ TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest);
/**
* delete task upstream relation
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 5897807a0f..4b94af1ee7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service.impl;
+import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
@@ -2775,12 +2776,110 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processDefinitionUpdate.setTenantId(tenant.getId());
}
- int update = processDefinitionMapper.updateById(processDefinitionUpdate);
- if (update <= 0) {
+ int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
+ if (insertVersion == 0) {
+ logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
+ processDefinitionUpdate.getCode(),
+ processDefinitionUpdate.getName());
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
- this.syncObj2Log(loginUser, processDefinition);
- return processDefinition;
+
+ int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
+ if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+ logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+ throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ }
+ logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+ processDefinitionUpdate.setVersion(insertVersion);
+ return processDefinitionUpdate;
+ }
+
+ public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+ ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+ Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+ int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+ processDefinitionLog.setVersion(insertVersion);
+ processDefinition.setVersion(insertVersion);
+
+ processDefinitionLog.setOperator(loginUser.getId());
+ processDefinition.setUserId(loginUser.getId());
+ processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+ processDefinition.setUpdateTime(processDefinition.getUpdateTime());
+ processDefinitionLog.setId(null);
+ int result = processDefinitionMapper.updateById(processDefinition);
+
+ int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+ processDefinitionLog.setId(processDefinition.getId());
+ return (insertLog & result) > 0 ? insertVersion : 0;
+ }
+
+ public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+ int processDefinitionVersion) {
+ long projectCode = processDefinition.getProjectCode();
+ long processDefinitionCode = processDefinition.getCode();
+ List<ProcessTaskRelation> taskRelations =
+ processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+ List<ProcessTaskRelationLog> taskRelationList =
+ taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+ List<Long> taskCodeList =
+ taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+ List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+ List<TaskDefinitionLog> taskDefinitionLogs =
+ taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+ if (taskRelationList.isEmpty()) {
+ return Constants.EXIT_CODE_SUCCESS;
+ }
+ Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+ if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+ taskDefinitionLogMap = taskDefinitionLogs
+ .stream()
+ .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+ }
+ Date now = new Date();
+ for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+ processTaskRelationLog.setProjectCode(projectCode);
+ processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+ processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+ if (taskDefinitionLogMap != null) {
+ TaskDefinitionLog preTaskDefinitionLog =
+ taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+ if (preTaskDefinitionLog != null) {
+ processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+ }
+ TaskDefinitionLog postTaskDefinitionLog =
+ taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+ if (postTaskDefinitionLog != null) {
+ processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+ }
+ }
+ processTaskRelationLog.setCreateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setOperateTime(now);
+ }
+ if (!taskRelations.isEmpty()) {
+ Set<Integer> processTaskRelationSet =
+ taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+ Set<Integer> taskRelationSet =
+ taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+ boolean isSame = CollectionUtils.isEqualCollection(processTaskRelationSet,
+ taskRelationSet);
+ if (isSame) {
+ logger.info("process task relations is non-existent, projectCode:{}, processCode:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode());
+ return Constants.EXIT_CODE_SUCCESS;
+ }
+ processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
+ }
+ List<ProcessTaskRelation> processTaskRelations =
+ taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList());
+ int insert = processTaskRelationMapper.batchInsert(processTaskRelations);
+ int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
+ return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
protected Map<String, Object> updateDagSchedule(User loginUser,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index df1b075d9d..4feff61e4a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service.impl;
+import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
@@ -31,12 +32,14 @@ import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
@@ -93,6 +96,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
+ @Autowired
+ private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
@Autowired
private ProcessService processService;
@@ -402,13 +408,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
*
* @param loginUser login user
* @param taskCode relation upstream code
+ * @param needSyncDag needSyncDag
* @param taskRelationUpdateUpstreamRequest relation downstream code
*/
@Override
@Transactional
- public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser,
- long taskCode,
- TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
+ public List<ProcessTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
+ long taskCode,
+ Boolean needSyncDag,
+ TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode);
if (downstreamTask == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
@@ -436,17 +444,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
taskRelationUpdateUpstreamRequest.toString());
}
-
+ processDefinition.setUpdateTime(new Date());
+ int insertVersion = processDefinition.getVersion();
+ if (needSyncDag) {
+ insertVersion =
+ this.saveProcessDefine(loginUser, processDefinition);
+ if (insertVersion <= 0) {
+ throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ }
+ }
// get new relation to create and out of date relation to delete
List<Long> taskCodeCreates = upstreamTaskCodes
.stream()
.filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch(
processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode))
.collect(Collectors.toList());
- List<Long> taskCodeDeletes = processTaskRelationExists
- .stream()
- .map(ProcessTaskRelation::getPreTaskCode)
- .filter(preTaskCode -> !upstreamTaskCodes.contains(preTaskCode))
+ List<Integer> taskCodeDeletes = processTaskRelationExists.stream()
+ .filter(ptr -> !upstreamTaskCodes.contains(ptr.getPreTaskCode()))
+ .map(ProcessTaskRelation::getId)
.collect(Collectors.toList());
// delete relation not exists
@@ -460,10 +475,20 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
// create relation not exists
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
for (long createCode : taskCodeCreates) {
- TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
+ long upstreamCode = 0L;
+ int version = 0;
+ if (createCode != 0L) {
+ // 0 for DAG root, should not, it may already exists and skip to create anymore
+ TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
+ if (upstreamTask == null) {
+ throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, createCode);
+ }
+ upstreamCode = upstreamTask.getCode();
+ version = upstreamTask.getVersion();
+ }
ProcessTaskRelation processTaskRelationCreate =
new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(),
- processDefinition.getCode(), upstreamTask.getCode(), upstreamTask.getVersion(),
+ processDefinition.getCode(), upstreamCode, version,
downstreamTask.getCode(), downstreamTask.getVersion(), null, null);
processTaskRelations.add(processTaskRelationCreate);
}
@@ -473,10 +498,98 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
// batch sync to process task relation log
- this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations);
+ int saveTaskRelationResult = saveTaskRelation(loginUser, processDefinition, insertVersion);
+ if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) {
+ logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+ throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ }
+ logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+ processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
return processTaskRelations;
}
+ public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
+ int processDefinitionVersion) {
+ long projectCode = processDefinition.getProjectCode();
+ long processDefinitionCode = processDefinition.getCode();
+ List<ProcessTaskRelation> taskRelations =
+ processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+ List<ProcessTaskRelationLog> taskRelationList =
+ taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+
+ List<Long> taskCodeList =
+ taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
+ List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
+ List<TaskDefinitionLog> taskDefinitionLogs =
+ taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
+
+ if (taskRelationList.isEmpty()) {
+ return Constants.EXIT_CODE_SUCCESS;
+ }
+ Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+ if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+ taskDefinitionLogMap = taskDefinitionLogs
+ .stream()
+ .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+ }
+ Date now = new Date();
+ for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+ processTaskRelationLog.setProjectCode(projectCode);
+ processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
+ processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+ if (taskDefinitionLogMap != null) {
+ TaskDefinitionLog preTaskDefinitionLog =
+ taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+ if (preTaskDefinitionLog != null) {
+ processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
+ }
+ TaskDefinitionLog postTaskDefinitionLog =
+ taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
+ if (postTaskDefinitionLog != null) {
+ processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
+ }
+ }
+ processTaskRelationLog.setCreateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setOperateTime(now);
+ }
+ if (CollectionUtils.isNotEmpty(taskRelations)) {
+ Set<Integer> processTaskRelationSet =
+ taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+ Set<Integer> taskRelationSet =
+ taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+ boolean isSame = org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,
+ taskRelationSet);
+ if (isSame) {
+ return Constants.EXIT_CODE_SUCCESS;
+ }
+ processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
+ }
+ List<ProcessTaskRelation> processTaskRelations =
+ taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList());
+ int insert = processTaskRelationMapper.batchInsert(processTaskRelations);
+ int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
+ return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
+ }
+
+ public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
+ ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
+ Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+ int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
+ processDefinitionLog.setVersion(insertVersion);
+ processDefinitionLog.setOperator(loginUser.getId());
+ processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+ processDefinitionLog.setId(null);
+ int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
+
+ processDefinitionLog.setId(processDefinition.getId());
+ int result = processDefinitionMapper.updateById(processDefinitionLog);
+ return (insertLog & result) > 0 ? insertVersion : 0;
+ }
+
private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
List<ProcessTaskRelation> processTaskRelationList) {
List<ProcessTaskRelationLog> relationLogs =
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index b641d1d7f4..9abbb07253 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -214,8 +214,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
String upstreamCodes) {
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest = new TaskRelationUpdateUpstreamRequest();
taskRelationUpdateUpstreamRequest.setWorkflowCode(workflowCode);
- taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes);
- return processTaskRelationService.updateUpstreamTaskDefinition(user, taskCode,
+ if (upstreamCodes != null) {
+ taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes);
+ }
+ return processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(user, taskCode, Boolean.FALSE,
taskRelationUpdateUpstreamRequest);
}
@@ -498,9 +500,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
- private void updateDag(User loginUser, long processDefinitionCode,
- List<ProcessTaskRelation> processTaskRelationList,
- List<TaskDefinitionLog> taskDefinitionLogs) {
+ public void updateDag(User loginUser, long processDefinitionCode,
+ List<ProcessTaskRelation> processTaskRelationList,
+ List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
@@ -625,6 +627,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode);
+
if (CollectionUtils.isNotEmpty(taskRelationList)) {
logger.info(
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.",
@@ -634,10 +637,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
.queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog));
}
-
this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(),
taskUpdateRequest.getUpstreamTasksCodes());
-
return taskDefinitionUpdate;
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 8b600696b7..5ada289227 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -1068,11 +1068,23 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
((ServiceException) exception).getCode());
// success
+ Mockito.when(processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()))
+ .thenReturn(processDefinition.getVersion());
Mockito.when(processDefinitionMapper.updateById(isA(ProcessDefinition.class))).thenReturn(1);
ProcessDefinition processDefinitionUpdate =
processDefinitionService.updateSingleProcessDefinition(user, processDefinitionCode,
workflowUpdateRequest);
- Assertions.assertEquals(processDefinition, processDefinitionUpdate);
+ Assertions.assertNotNull(processDefinitionUpdate);
+
+ // check version
+ Assertions.assertEquals(processDefinition.getVersion() + 1, processDefinitionUpdate.getVersion());
+ }
+
+ @Test
+ public void testCheckVersion() {
+ WorkflowFilterRequest workflowFilterRequest = new WorkflowFilterRequest();
+ workflowFilterRequest.setWorkflowName(name);
+
}
@Test
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index b6569a1e67..b6668dff15 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
@@ -40,18 +41,23 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import java.text.MessageFormat;
@@ -91,9 +97,18 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProjectServiceImpl projectService;
+ @InjectMocks
+ private ProcessServiceImpl processServiceImpl;
+
@Mock
private ProcessService processService;
+ @Mock
+ private ProcessDefinitionLogMapper processDefineLogMapper;
+
+ @Mock
+ private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@@ -429,11 +444,15 @@ public class TaskDefinitionServiceImplTest {
// success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
- Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class),
- isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList());
+ Mockito.when(
+ processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
+ isA(Boolean.class),
+ isA(TaskRelationUpdateUpstreamRequest.class)))
+ .thenReturn(getProcessTaskRelationList());
Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class),
isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition());
Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
+
}
@Test
@@ -500,10 +519,54 @@ public class TaskDefinitionServiceImplTest {
// success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
- Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class),
- isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList());
+ Mockito.when(
+ processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
+ isA(Boolean.class),
+ isA(TaskRelationUpdateUpstreamRequest.class)))
+ .thenReturn(getProcessTaskRelationList());
Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
+
+ TaskDefinition taskDefinition =
+ taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest);
+ Assertions.assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion());
+ }
+
+ @Test
+ public void testUpdateDag() {
+ User loginUser = getLoginUser();
+ ProcessDefinition processDefinition = getProcessDefinition();
+ processDefinition.setId(null);
+ List<ProcessTaskRelation> processTaskRelationList = getProcessTaskRelationList();
+ TaskDefinitionLog taskDefinitionLog = getTaskDefinitionLog();
+ ArrayList<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+ taskDefinitionLogs.add(taskDefinitionLog);
+ Integer version = 1;
+ Mockito.when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition);
+
+ // saveProcessDefine
+ Mockito.when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version);
+ Mockito.when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
+ Mockito.when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
+ int insertVersion =
+ processServiceImpl.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+ Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE))
+ .thenReturn(insertVersion);
+ Assertions.assertEquals(insertVersion, version + 1);
+
+ // saveTaskRelation
+ List<ProcessTaskRelationLog> processTaskRelationLogList = getProcessTaskRelationLogList();
+ Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getProjectCode()),
+ eq(processDefinition.getCode()))).thenReturn(processTaskRelationList);
+ Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1);
+ Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1);
+ int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
+ processDefinition.getCode(), insertVersion, processTaskRelationLogList, taskDefinitionLogs,
+ Boolean.TRUE);
+ Assertions.assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult);
+ Assertions.assertDoesNotThrow(
+ () -> taskDefinitionService.updateDag(loginUser, processDefinition.getCode(), processTaskRelationList,
+ taskDefinitionLogs));
}
@Test
@@ -528,6 +591,17 @@ public class TaskDefinitionServiceImplTest {
Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
}
+ /**
+ * create admin user
+ */
+ private User getLoginUser() {
+ User loginUser = new User();
+ loginUser.setUserType(UserType.GENERAL_USER);
+ loginUser.setUserName("admin");
+ loginUser.setId(1);
+ return loginUser;
+ }
+
/**
* get mock Project
*
@@ -563,6 +637,19 @@ public class TaskDefinitionServiceImplTest {
return taskDefinition;
}
+ private TaskDefinitionLog getTaskDefinitionLog() {
+ TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+ taskDefinitionLog.setProjectCode(PROJECT_CODE);
+ taskDefinitionLog.setCode(TASK_CODE);
+ taskDefinitionLog.setVersion(VERSION);
+ taskDefinitionLog.setTaskType("SHELL");
+ taskDefinitionLog.setTaskParams(TASK_PARAMETER);
+ taskDefinitionLog.setFlag(Flag.YES);
+ taskDefinitionLog.setCpuQuota(RESOURCE_RATE);
+ taskDefinitionLog.setMemoryMax(RESOURCE_RATE);
+ return taskDefinitionLog;
+ }
+
private List<ProcessTaskRelation> getProcessTaskRelationList() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
@@ -576,6 +663,19 @@ public class TaskDefinitionServiceImplTest {
return processTaskRelationList;
}
+ private List<ProcessTaskRelationLog> getProcessTaskRelationLogList() {
+ List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
+
+ ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
+ processTaskRelationLog.setProjectCode(PROJECT_CODE);
+ processTaskRelationLog.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+ processTaskRelationLog.setPreTaskCode(TASK_CODE);
+ processTaskRelationLog.setPostTaskCode(TASK_CODE + 1L);
+
+ processTaskRelationLogList.add(processTaskRelationLog);
+ return processTaskRelationLogList;
+ }
+
private List<ProcessTaskRelation> getProcessTaskRelationList2() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();