You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/11/07 02:29:03 UTC

[dolphinscheduler] branch 3.1.1-prepare updated: cherry-pick [fix-12675]edit workflow related task, workflow's task version change

This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.1-prepare by this push:
     new c7f990791e cherry-pick [fix-12675]edit workflow related task, workflow's task version change
c7f990791e is described below

commit c7f990791e558ba8f38a4ff27f840fff53d17a0a
Author: jackfanwan <61...@users.noreply.github.com>
AuthorDate: Sun Nov 6 20:49:07 2022 +0800

    cherry-pick [fix-12675]edit workflow related task, workflow's task version change
---
 .../apache/dolphinscheduler/api/enums/Status.java  | 16 +++++++++
 .../service/impl/TaskDefinitionServiceImpl.java    | 19 ++++++++++
 .../api/service/TaskDefinitionServiceImplTest.java | 41 ++++++++++++++++++----
 .../dao/mapper/ProcessTaskRelationMapper.java      | 17 +++++++++
 .../dao/mapper/ProcessTaskRelationMapper.xml       | 31 ++++++++++++++++
 5 files changed, 118 insertions(+), 6 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 47bb5dbbbf..cb9a8d60a7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -311,6 +311,22 @@ public enum Status {
     NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
     BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"),
     START_TASK_INSTANCE_ERROR(50059, "start task instance error", "运行任务流实例错误"),
+    DELETE_PROCESS_DEFINE_ERROR(50060, "delete process definition [{0}] error: {1}", "删除工作流定义[{0}]错误: {1}"),
+    CREATE_TASK_DEFINITION_LOG_ERROR(50061, "create task definition log {0} error", "创建任务操作记录 {0} 错误"),
+    DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR(50062, "delete task definition {0} error", "删除任务定义 {0} 错误"),
+    TASK_DEFINITION_NOT_CHANGE(50063, "task definition {0} do not change", "任务定义 {0} 没有变化"),
+    TASK_DEFINITION_NOT_EXISTS(50064, "task definition {0} do not exists", "任务定义 {0} 不存在"),
+    UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR(50065, "update task upstream relation error", "更新任务上游关系错误"),
+    CREATE_PROCESS_TASK_RELATION_LOG_ERROR(50066, "create process task relation log {0}-{1} error",
+            "创建任务关系日志 {0}-{1} 错误"),
+    PROCESS_TASK_RELATION_NOT_EXPECT(50067, "process task relation number not expect, expect {0} but get {1}",
+            "工作流任务关系数量不符合预期,预期 {0} 但是实际 {1}"),
+    PROCESS_TASK_RELATION_BATCH_DELETE_ERROR(50068, "batch delete process task relation {0} error",
+            "批量删除工作流任务关系 {0} 错误"),
+    PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error",
+            "批量创建工作流任务关系 {0} 错误"),
+    PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
+            "批量修改工作流任务关系错误"),
 
     HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
     STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 1d5c7f7f01..e6e57151dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -534,6 +534,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             logger.info(
                     "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
                     projectCode, taskCode, taskDefinitionToUpdate.getVersion());
+        // update process task relation
+        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
+                .queryByTaskCode(taskDefinitionToUpdate.getCode());
+        if (CollectionUtils.isNotEmpty(processTaskRelations)) {
+            for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+                if (taskCode == processTaskRelation.getPreTaskCode()) {
+                    processTaskRelation.setPreTaskVersion(version);
+                } else if (taskCode == processTaskRelation.getPostTaskCode()) {
+                    processTaskRelation.setPostTaskVersion(version);
+                }
+                int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
+                if (count != 1) {
+                    logger.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
+                            projectCode, taskCode);
+                    putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+                    throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+                }
+            }
+        }
         return taskDefinitionToUpdate;
     }
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 61ab0f6f7c..288223723e 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -26,10 +26,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -140,8 +137,19 @@ public class TaskDefinitionServiceImplTest {
         Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
         Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1);
         Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
-        result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, taskDefinitionJson);
-        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+        Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
+        Mockito.when(processTaskRelationMapper
+                .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
+        result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
+        Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+        // failure
+        Mockito.when(processTaskRelationMapper
+                .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson));
+        Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
+                ((ServiceException) exception).getCode());
+
     }
 
     @Test
@@ -336,4 +344,25 @@ public class TaskDefinitionServiceImplTest {
         Map<String, Object> failResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.getEnum(2));
         Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
     }
+
+    private List<ProcessTaskRelation> getProcessTaskRelationList2() {
+        List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(PROJECT_CODE);
+        processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+        processTaskRelation.setPreTaskCode(TASK_CODE);
+        processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
+
+        processTaskRelationList.add(processTaskRelation);
+
+        ProcessTaskRelation processTaskRelation2 = new ProcessTaskRelation();
+        processTaskRelation2.setProjectCode(PROJECT_CODE);
+        processTaskRelation2.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+        processTaskRelation2.setPreTaskCode(TASK_CODE - 1);
+        processTaskRelation2.setPostTaskCode(TASK_CODE);
+        processTaskRelationList.add(processTaskRelation2);
+
+        return processTaskRelationList;
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 4332b54960..7d1d18b169 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.dao.mapper;
 
+import com.baomidou.mybatisplus.core.metadata.IPage;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 
@@ -197,5 +198,21 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
      */
     List<ProcessTaskRelation> queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
 
+    /**
+     * Filter process task relation
+     *
+     * @param page page
+     * @param processTaskRelation process definition object
+     * @return process task relation IPage
+     */
+    IPage<ProcessTaskRelation> filterProcessTaskRelation(IPage<ProcessTaskRelation> page,
+                                                         @Param("relation") ProcessTaskRelation processTaskRelation);
 
+    /**
+     * batch update process task relation version
+     *
+     * @param processTaskRelationList process task relation list
+     * @return update num
+     */
+    int updateProcessTaskRelationTaskVersion(@Param("processTaskRelation") ProcessTaskRelation processTaskRelationList);
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 2a1892db2b..8afa0433c2 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -200,4 +200,35 @@
             and post_task_code = #{postTaskCode}
         </if>
     </select>
+
+    <select id="filterProcessTaskRelation"
+            parameterType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation"
+            resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        SELECT
+        <include refid="baseSql"/>
+        FROM t_ds_process_task_relation
+        <where>
+            <if test=" relation.projectCode != 0">
+                and project_code = #{relation.projectCode}
+            </if>
+            <if test=" relation.processDefinitionCode != 0">
+                and process_definition_code = #{relation.processDefinitionCode}
+            </if>
+            <if test=" relation.preTaskCode != 0">
+                and pre_task_code = #{relation.preTaskCode}
+            </if>
+            <if test=" relation.postTaskCode != 0">
+                and post_task_code = #{relation.postTaskCode}
+            </if>
+        </where>
+        order by update_time desc, id asc
+    </select>
+    <update id="updateProcessTaskRelationTaskVersion">
+            update t_ds_process_task_relation
+            set
+                pre_task_version=#{processTaskRelation.preTaskVersion},
+                post_task_version=#{processTaskRelation.postTaskVersion}
+            where
+                id = #{processTaskRelation.id}
+    </update>
 </mapper>