You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/11/06 12:49:12 UTC

[dolphinscheduler] branch dev updated: [fix-12675]edit workflow related task, workflow's task version change (#12692)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f249f2b257 [fix-12675]edit workflow related task, workflow's task version change (#12692)
f249f2b257 is described below

commit f249f2b257847787605758797669abecf1b782d8
Author: jackfanwan <61...@users.noreply.github.com>
AuthorDate: Sun Nov 6 20:49:07 2022 +0800

    [fix-12675]edit workflow related task, workflow's task version change (#12692)
    
    * edit workflow related task, workflow's task version change
    
    * Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
    
    Co-authored-by: fanwanlong <fa...@kezaihui.com>
    Co-authored-by: caishunfeng <ca...@gmail.com>
---
 .../apache/dolphinscheduler/api/enums/Status.java  |  2 ++
 .../service/impl/TaskDefinitionServiceImpl.java    | 19 +++++++++++++
 .../api/service/TaskDefinitionServiceImplTest.java | 32 ++++++++++++++++++++++
 .../dao/mapper/ProcessTaskRelationMapper.java      |  8 ++++++
 .../dao/mapper/ProcessTaskRelationMapper.xml       |  8 ++++++
 5 files changed, 69 insertions(+)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 3dae637a0e..642b08d1e8 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -371,6 +371,8 @@ public enum Status {
             "批量删除工作流任务关系 {0} 错误"),
     PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error",
             "批量创建工作流任务关系 {0} 错误"),
+    PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
+            "批量修改工作流任务关系错误"),
 
     HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
     STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 1dbc91ad91..65350a325a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -765,6 +765,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             logger.info(
                     "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
                     projectCode, taskCode, taskDefinitionToUpdate.getVersion());
+        // update process task relation
+        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
+                .queryByTaskCode(taskDefinitionToUpdate.getCode());
+        if (CollectionUtils.isNotEmpty(processTaskRelations)) {
+            for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+                if (taskCode == processTaskRelation.getPreTaskCode()) {
+                    processTaskRelation.setPreTaskVersion(version);
+                } else if (taskCode == processTaskRelation.getPostTaskCode()) {
+                    processTaskRelation.setPostTaskVersion(version);
+                }
+                int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
+                if (count != 1) {
+                    logger.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
+                            projectCode, taskCode);
+                    putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+                    throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+                }
+            }
+        }
         return taskDefinitionToUpdate;
     }
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 0c271ace6f..a4ef76a747 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -170,8 +170,19 @@ public class TaskDefinitionServiceImplTest {
         Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
         Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
         Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
+        Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
+        Mockito.when(processTaskRelationMapper
+                .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
         result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+        // failure
+        Mockito.when(processTaskRelationMapper
+                .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
+        exception = Assertions.assertThrows(ServiceException.class,
+                () -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson));
+        Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
+                ((ServiceException) exception).getCode());
+
     }
 
     @Test
@@ -531,4 +542,25 @@ public class TaskDefinitionServiceImplTest {
         processTaskRelationList.add(processTaskRelation);
         return processTaskRelationList;
     }
+
+    private List<ProcessTaskRelation> getProcessTaskRelationList2() {
+        List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setProjectCode(PROJECT_CODE);
+        processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+        processTaskRelation.setPreTaskCode(TASK_CODE);
+        processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
+
+        processTaskRelationList.add(processTaskRelation);
+
+        ProcessTaskRelation processTaskRelation2 = new ProcessTaskRelation();
+        processTaskRelation2.setProjectCode(PROJECT_CODE);
+        processTaskRelation2.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
+        processTaskRelation2.setPreTaskCode(TASK_CODE - 1);
+        processTaskRelation2.setPostTaskCode(TASK_CODE);
+        processTaskRelationList.add(processTaskRelation2);
+
+        return processTaskRelationList;
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 4549cc15d0..58a474f85b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -211,4 +211,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
      */
     IPage<ProcessTaskRelation> filterProcessTaskRelation(IPage<ProcessTaskRelation> page,
                                                          @Param("relation") ProcessTaskRelation processTaskRelation);
+
+    /**
+     * batch update process task relation version
+     *
+     * @param processTaskRelationList process task relation list
+     * @return update num
+     */
+    int updateProcessTaskRelationTaskVersion(@Param("processTaskRelation") ProcessTaskRelation processTaskRelationList);
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 0f7cab8208..ffd6957fbc 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -221,4 +221,12 @@
         </where>
         order by update_time desc, id asc
     </select>
+    <update id="updateProcessTaskRelationTaskVersion">
+            update t_ds_process_task_relation
+            set
+                pre_task_version=#{processTaskRelation.preTaskVersion},
+                post_task_version=#{processTaskRelation.postTaskVersion}
+            where
+                id = #{processTaskRelation.id}
+    </update>
 </mapper>