You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/01/02 16:26:28 UTC

[dolphinscheduler] branch dev updated: [Bug][ApiServer] workflow copy (#7694) (#7762)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4d16c24  [Bug][ApiServer] workflow copy  (#7694) (#7762)
4d16c24 is described below

commit 4d16c24b6e78887927a9550a90b25bceb9b5c0b1
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Mon Jan 3 00:26:21 2022 +0800

    [Bug][ApiServer] workflow copy  (#7694) (#7762)
    
    * fix workflow copy
    
    * fix copy
    
    * fix copy
    
    * code style
---
 .../apache/dolphinscheduler/api/enums/Status.java  |  2 +-
 .../service/impl/ProcessDefinitionServiceImpl.java | 44 ++++++++++++++++++----
 2 files changed, 38 insertions(+), 8 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index e58e904..3b5427d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -294,7 +294,7 @@ public enum Status {
     PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"),
     DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
     NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
-    NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{}] does not support copy", "不支持复制的任务类型[{}]"),
+    NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"),
     HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 55e73e1..ffaac26 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -107,6 +107,7 @@ import org.springframework.web.multipart.MultipartFile;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
@@ -1102,13 +1103,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
             return result;
         }
-        HashMap<Long, Project> userProjects =  new HashMap(Constants.DEFAULT_HASH_MAP_SIZE);
+        HashMap<Long, Project> userProjects =  new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
         projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())
-                .forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
+            .forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
 
         // check processDefinition exist in project
-        List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream().
-                filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
+        List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream()
+            .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
         if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
             return result;
@@ -1312,6 +1313,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         List<String> failedProcessList = new ArrayList<>();
         doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, true);
+        if (result.get(Constants.STATUS) == Status.NOT_SUPPORT_COPY_TASK_TYPE) {
+            return result;
+        }
         checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, true);
         return result;
     }
@@ -1389,18 +1393,35 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             processDefinition.setProjectCode(targetProjectCode);
             if (isCopy) {
                 List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
+                Map<Long, Long> taskCodeMap = new HashMap<>();
                 for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
                     if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())
                         || TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType())
-                        || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())) {
+                        || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())
+                        || TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
                         putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType());
-                        throw new ServiceException(Status.NOT_SUPPORT_COPY_TASK_TYPE);
+                        return;
+                    }
+                    try {
+                        long taskCode = CodeGenerateUtils.getInstance().genCode();
+                        taskCodeMap.put(taskDefinitionLog.getCode(), taskCode);
+                        taskDefinitionLog.setCode(taskCode);
+                    } catch (CodeGenerateException e) {
+                        putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
+                        throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
                     }
                     taskDefinitionLog.setProjectCode(targetProjectCode);
-                    taskDefinitionLog.setCode(0L);
                     taskDefinitionLog.setVersion(0);
                     taskDefinitionLog.setName(taskDefinitionLog.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
                 }
+                for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
+                    if (processTaskRelationLog.getPreTaskCode() > 0) {
+                        processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode()));
+                    }
+                    if (processTaskRelationLog.getPostTaskCode() > 0) {
+                        processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode()));
+                    }
+                }
                 try {
                     processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
                 } catch (CodeGenerateException e) {
@@ -1410,6 +1431,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 processDefinition.setId(0);
                 processDefinition.setUserId(loginUser.getId());
                 processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
+                if (StringUtils.isNotBlank(processDefinition.getLocations())) {
+                    ArrayNode jsonNodes = JSONUtils.parseArray(processDefinition.getLocations());
+                    for (int i = 0; i < jsonNodes.size(); i++) {
+                        ObjectNode node = (ObjectNode) jsonNodes.path(i);
+                        node.put("taskCode", taskCodeMap.get(node.get("taskCode").asLong()));
+                        jsonNodes.set(i, node);
+                    }
+                    processDefinition.setLocations(JSONUtils.toJsonString(jsonNodes));
+                }
                 try {
                     result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs));
                 } catch (Exception e) {