You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2021/12/28 02:10:18 UTC

[dolphinscheduler] branch 2.0.2-prepare updated: [cherry-pick-2.0.2][ApiServer] workflow copy (#7657)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new 6033e98  [cherry-pick-2.0.2][ApiServer] workflow copy (#7657)
6033e98 is described below

commit 6033e98a2d2668193e1a08003d8b0618ed6dc088
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Dec 28 10:10:11 2021 +0800

    [cherry-pick-2.0.2][ApiServer] workflow copy (#7657)
    
    * fix 7597 (#7598)
    
    * cherry-pick #7647
    
    Co-authored-by: JinYong Li <42...@users.noreply.github.com>
---
 .../apache/dolphinscheduler/api/enums/Status.java  |  6 ++++--
 .../service/impl/ProcessDefinitionServiceImpl.java | 23 +++++++++++++++++-----
 .../service/impl/TaskDefinitionServiceImpl.java    |  8 ++++----
 3 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index c4243f0..87f658b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -267,9 +267,9 @@ public enum Status {
     EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
     BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
     IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
-    TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"),
+    TASK_DEFINE_NOT_EXIST(50030, "task definition [{0}] does not exist", "任务定义[{0}]不存在"),
     CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"),
-    PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
+    PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation [{0}] does not exist", "工作流任务关系[{0}]不存在"),
     PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
     PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"),
     CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
@@ -292,6 +292,8 @@ public enum Status {
     MAIN_TABLE_USING_VERSION(50053, "the version that the master table is using", "主表正在使用该版本"),
     PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"),
     DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
+    NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
+    NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{}] does not support copy", "不支持复制的任务类型[{}]"),
     HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 9dff036..237148c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -1308,10 +1309,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
     /**
      * batch move process definition
-     *
-     * @param loginUser         loginUser
-     * @param projectCode       projectCode
-     * @param codes             processDefinitionCodes
+     * Will be deleted
+     * @param loginUser loginUser
+     * @param projectCode projectCode
+     * @param codes processDefinitionCodes
      * @param targetProjectCode targetProjectCode
      */
     @Override
@@ -1378,6 +1379,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             List<ProcessTaskRelationLog> taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
             processDefinition.setProjectCode(targetProjectCode);
             if (isCopy) {
+                List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
+                for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+                    if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())
+                        || TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType())
+                        || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())) {
+                        putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType());
+                        throw new ServiceException(Status.NOT_SUPPORT_COPY_TASK_TYPE);
+                    }
+                    taskDefinitionLog.setCode(0L);
+                    taskDefinitionLog.setVersion(0);
+                    taskDefinitionLog.setName(taskDefinitionLog.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
+                }
                 try {
                     processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
                 } catch (CodeGenerateException e) {
@@ -1388,7 +1401,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 processDefinition.setUserId(loginUser.getId());
                 processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
                 try {
-                    result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, Lists.newArrayList()));
+                    result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs));
                 } catch (Exception e) {
                     putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
                     throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 97ecb6c..3277c7a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -247,15 +247,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        if (processService.isTaskOnline(taskCode)) {
-            putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
-            return result;
-        }
         TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
         if (taskDefinition == null) {
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
             return result;
         }
+        if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
+            putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
+            return result;
+        }
         TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
         if (taskDefinitionToUpdate == null) {
             logger.error("taskDefinitionJson is not valid json");