You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/11/07 02:29:03 UTC
[dolphinscheduler] branch 3.1.1-prepare updated: cherry-pick [fix-12675]edit workflow related task, workflow's task version change
This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.1-prepare by this push:
new c7f990791e cherry-pick [fix-12675]edit workflow related task, workflow's task version change
c7f990791e is described below
commit c7f990791e558ba8f38a4ff27f840fff53d17a0a
Author: jackfanwan <61...@users.noreply.github.com>
AuthorDate: Sun Nov 6 20:49:07 2022 +0800
cherry-pick [fix-12675]edit workflow related task, workflow's task version change
---
.../apache/dolphinscheduler/api/enums/Status.java | 16 +++++++++
.../service/impl/TaskDefinitionServiceImpl.java | 19 ++++++++++
.../api/service/TaskDefinitionServiceImplTest.java | 41 ++++++++++++++++++----
.../dao/mapper/ProcessTaskRelationMapper.java | 17 +++++++++
.../dao/mapper/ProcessTaskRelationMapper.xml | 31 ++++++++++++++++
5 files changed, 118 insertions(+), 6 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 47bb5dbbbf..cb9a8d60a7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -311,6 +311,22 @@ public enum Status {
NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"),
START_TASK_INSTANCE_ERROR(50059, "start task instance error", "运行任务流实例错误"),
+ DELETE_PROCESS_DEFINE_ERROR(50060, "delete process definition [{0}] error: {1}", "删除工作流定义[{0}]错误: {1}"),
+ CREATE_TASK_DEFINITION_LOG_ERROR(50061, "create task definition log {0} error", "创建任务操作记录 {0} 错误"),
+ DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR(50062, "delete task definition {0} error", "删除任务定义 {0} 错误"),
+ TASK_DEFINITION_NOT_CHANGE(50063, "task definition {0} do not change", "任务定义 {0} 没有变化"),
+ TASK_DEFINITION_NOT_EXISTS(50064, "task definition {0} do not exists", "任务定义 {0} 不存在"),
+ UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR(50065, "update task upstream relation error", "更新任务上游关系错误"),
+ CREATE_PROCESS_TASK_RELATION_LOG_ERROR(50066, "create process task relation log {0}-{1} error",
+ "创建任务关系日志 {0}-{1} 错误"),
+ PROCESS_TASK_RELATION_NOT_EXPECT(50067, "process task relation number not expect, expect {0} but get {1}",
+ "工作流任务关系数量不符合预期,预期 {0} 但是实际 {1}"),
+ PROCESS_TASK_RELATION_BATCH_DELETE_ERROR(50068, "batch delete process task relation {0} error",
+ "批量删除工作流任务关系 {0} 错误"),
+ PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error",
+ "批量创建工作流任务关系 {0} 错误"),
+ PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
+ "批量修改工作流任务关系错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 1d5c7f7f01..e6e57151dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -534,6 +534,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
logger.info(
"Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
projectCode, taskCode, taskDefinitionToUpdate.getVersion());
+ // update process task relation
+ List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
+ .queryByTaskCode(taskDefinitionToUpdate.getCode());
+ if (CollectionUtils.isNotEmpty(processTaskRelations)) {
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (taskCode == processTaskRelation.getPreTaskCode()) {
+ processTaskRelation.setPreTaskVersion(version);
+ } else if (taskCode == processTaskRelation.getPostTaskCode()) {
+ processTaskRelation.setPostTaskVersion(version);
+ }
+ int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
+ if (count != 1) {
+ logger.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
+ projectCode, taskCode);
+ putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+ throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+ }
+ }
+ }
return taskDefinitionToUpdate;
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 61ab0f6f7c..288223723e 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -26,10 +26,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -140,8 +137,19 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
- result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, taskDefinitionJson);
- Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+ Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
+ Mockito.when(processTaskRelationMapper
+ .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
+ result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
+ Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+ // failure
+ Mockito.when(processTaskRelationMapper
+ .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
+ exception = Assertions.assertThrows(ServiceException.class,
+ () -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson));
+ Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
+ ((ServiceException) exception).getCode());
+
}
@Test
@@ -336,4 +344,25 @@ public class TaskDefinitionServiceImplTest {
Map<String, Object> failResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
}
+
+ private List<ProcessTaskRelation> getProcessTaskRelationList2() {
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setProjectCode(PROJECT_CODE);
+ processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+ processTaskRelation.setPreTaskCode(TASK_CODE);
+ processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
+
+ processTaskRelationList.add(processTaskRelation);
+
+ ProcessTaskRelation processTaskRelation2 = new ProcessTaskRelation();
+ processTaskRelation2.setProjectCode(PROJECT_CODE);
+ processTaskRelation2.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+ processTaskRelation2.setPreTaskCode(TASK_CODE - 1);
+ processTaskRelation2.setPostTaskCode(TASK_CODE);
+ processTaskRelationList.add(processTaskRelation2);
+
+ return processTaskRelationList;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 4332b54960..7d1d18b169 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@@ -197,5 +198,21 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
List<ProcessTaskRelation> queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
+ /**
+ * Filter process task relation
+ *
+ * @param page page
+ * @param processTaskRelation process definition object
+ * @return process task relation IPage
+ */
+ IPage<ProcessTaskRelation> filterProcessTaskRelation(IPage<ProcessTaskRelation> page,
+ @Param("relation") ProcessTaskRelation processTaskRelation);
+ /**
+ * batch update process task relation version
+ *
+ * @param processTaskRelationList process task relation list
+ * @return update num
+ */
+ int updateProcessTaskRelationTaskVersion(@Param("processTaskRelation") ProcessTaskRelation processTaskRelationList);
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 2a1892db2b..8afa0433c2 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -200,4 +200,35 @@
and post_task_code = #{postTaskCode}
</if>
</select>
+
+ <select id="filterProcessTaskRelation"
+ parameterType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation"
+ resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ SELECT
+ <include refid="baseSql"/>
+ FROM t_ds_process_task_relation
+ <where>
+ <if test=" relation.projectCode != 0">
+ and project_code = #{relation.projectCode}
+ </if>
+ <if test=" relation.processDefinitionCode != 0">
+ and process_definition_code = #{relation.processDefinitionCode}
+ </if>
+ <if test=" relation.preTaskCode != 0">
+ and pre_task_code = #{relation.preTaskCode}
+ </if>
+ <if test=" relation.postTaskCode != 0">
+ and post_task_code = #{relation.postTaskCode}
+ </if>
+ </where>
+ order by update_time desc, id asc
+ </select>
+ <update id="updateProcessTaskRelationTaskVersion">
+ update t_ds_process_task_relation
+ set
+ pre_task_version=#{processTaskRelation.preTaskVersion},
+ post_task_version=#{processTaskRelation.postTaskVersion}
+ where
+ id = #{processTaskRelation.id}
+ </update>
</mapper>