You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/11/06 12:49:12 UTC
[dolphinscheduler] branch dev updated: [fix-12675]edit workflow related task, workflow's task version change (#12692)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f249f2b257 [fix-12675]edit workflow related task, workflow's task version change (#12692)
f249f2b257 is described below
commit f249f2b257847787605758797669abecf1b782d8
Author: jackfanwan <61...@users.noreply.github.com>
AuthorDate: Sun Nov 6 20:49:07 2022 +0800
[fix-12675]edit workflow related task, workflow's task version change (#12692)
* edit workflow related task, workflow's task version change
* Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
Co-authored-by: fanwanlong <fa...@kezaihui.com>
Co-authored-by: caishunfeng <ca...@gmail.com>
---
.../apache/dolphinscheduler/api/enums/Status.java | 2 ++
.../service/impl/TaskDefinitionServiceImpl.java | 19 +++++++++++++
.../api/service/TaskDefinitionServiceImplTest.java | 32 ++++++++++++++++++++++
.../dao/mapper/ProcessTaskRelationMapper.java | 8 ++++++
.../dao/mapper/ProcessTaskRelationMapper.xml | 8 ++++++
5 files changed, 69 insertions(+)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 3dae637a0e..642b08d1e8 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -371,6 +371,8 @@ public enum Status {
"批量删除工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error",
"批量创建工作流任务关系 {0} 错误"),
+ PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
+ "批量修改工作流任务关系错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 1dbc91ad91..65350a325a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -765,6 +765,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
logger.info(
"Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
projectCode, taskCode, taskDefinitionToUpdate.getVersion());
+ // update process task relation
+ List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
+ .queryByTaskCode(taskDefinitionToUpdate.getCode());
+ if (CollectionUtils.isNotEmpty(processTaskRelations)) {
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (taskCode == processTaskRelation.getPreTaskCode()) {
+ processTaskRelation.setPreTaskVersion(version);
+ } else if (taskCode == processTaskRelation.getPostTaskCode()) {
+ processTaskRelation.setPostTaskVersion(version);
+ }
+ int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
+ if (count != 1) {
+ logger.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
+ projectCode, taskCode);
+ putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+ throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+ }
+ }
+ }
return taskDefinitionToUpdate;
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 0c271ace6f..a4ef76a747 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -170,8 +170,19 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
+ Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
+ Mockito.when(processTaskRelationMapper
+ .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+ // failure
+ Mockito.when(processTaskRelationMapper
+ .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
+ exception = Assertions.assertThrows(ServiceException.class,
+ () -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson));
+ Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
+ ((ServiceException) exception).getCode());
+
}
@Test
@@ -531,4 +542,25 @@ public class TaskDefinitionServiceImplTest {
processTaskRelationList.add(processTaskRelation);
return processTaskRelationList;
}
+
+ private List<ProcessTaskRelation> getProcessTaskRelationList2() {
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setProjectCode(PROJECT_CODE);
+ processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+ processTaskRelation.setPreTaskCode(TASK_CODE);
+ processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
+
+ processTaskRelationList.add(processTaskRelation);
+
+ ProcessTaskRelation processTaskRelation2 = new ProcessTaskRelation();
+ processTaskRelation2.setProjectCode(PROJECT_CODE);
+ processTaskRelation2.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+ processTaskRelation2.setPreTaskCode(TASK_CODE - 1);
+ processTaskRelation2.setPostTaskCode(TASK_CODE);
+ processTaskRelationList.add(processTaskRelation2);
+
+ return processTaskRelationList;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 4549cc15d0..58a474f85b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -211,4 +211,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
IPage<ProcessTaskRelation> filterProcessTaskRelation(IPage<ProcessTaskRelation> page,
@Param("relation") ProcessTaskRelation processTaskRelation);
+
+ /**
+ * batch update process task relation version
+ *
+ * @param processTaskRelationList process task relation list
+ * @return update num
+ */
+ int updateProcessTaskRelationTaskVersion(@Param("processTaskRelation") ProcessTaskRelation processTaskRelationList);
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 0f7cab8208..ffd6957fbc 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -221,4 +221,12 @@
</where>
order by update_time desc, id asc
</select>
+ <update id="updateProcessTaskRelationTaskVersion">
+ update t_ds_process_task_relation
+ set
+ pre_task_version=#{processTaskRelation.preTaskVersion},
+ post_task_version=#{processTaskRelation.postTaskVersion}
+ where
+ id = #{processTaskRelation.id}
+ </update>
</mapper>