You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2021/11/29 02:31:27 UTC

[dolphinscheduler] branch dev updated: [Improvement][API][num-1 & 2] Implement createEmptyProcessDefinition & updateProcessDefinitionBasicInfo (#6984)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f30ee59  [Improvement][API][num-1 & 2] Implement createEmptyProcessDefinition & updateProcessDefinitionBasicInfo (#6984)
f30ee59 is described below

commit f30ee5956eb33c069bc2ebaec18b555fe342be77
Author: EdwardYang <ya...@126.com>
AuthorDate: Mon Nov 29 10:30:48 2021 +0800

    [Improvement][API][num-1 & 2] Implement createEmptyProcessDefinition & updateProcessDefinitionBasicInfo (#6984)
    
    * [feature] finish interface createEmptyProcessDefinition
    
    * [feature] finish interface updateProcessDefinitionBasicInfo
    
    * [feature] use class Schedule replace ScheduleDto and delete it.
    
    * [feature] delete chinese comment.
    
    * [Improvement] change '.*' to single form of import
    
    Co-authored-by: 熠然 <ya...@cai-inc.com>
    Co-authored-by: edward-yang <ya...@gmail.com>
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 204 ++++++++++++++++++++-
 1 file changed, 202 insertions(+), 2 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 1276717..2a3c14d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
 
 import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
+import org.apache.dolphinscheduler.api.dto.ScheduleParam;
 import org.apache.dolphinscheduler.api.dto.treeview.Instance;
 import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
 import org.apache.dolphinscheduler.api.enums.Status;
@@ -33,9 +34,12 @@ import org.apache.dolphinscheduler.api.utils.FileUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -1533,6 +1537,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return process definition code
      */
     @Override
+    @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> createEmptyProcessDefinition(User loginUser,
                                                             long projectCode,
                                                             String name,
@@ -1542,7 +1547,98 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                             String tenantCode,
                                                             String scheduleJson,
                                                             ProcessExecutionTypeEnum executionType) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+
+        // check whether the new process define name exist
+        ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
+        if (definition != null) {
+            putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
+            return result;
+        }
+
+        int tenantId = -1;
+        if (!Constants.DEFAULT.equals(tenantCode)) {
+            Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+            if (tenant == null) {
+                putMsg(result, Status.TENANT_NOT_EXIST);
+                return result;
+            }
+            tenantId = tenant.getId();
+        }
+        long processDefinitionCode;
+        try {
+            processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
+        } catch (CodeGenerateException e) {
+            putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
+            return result;
+        }
+        ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
+                globalParams, "", timeout, loginUser.getId(), tenantId);
+        processDefinition.setExecutionType(executionType);
+        result = createEmptyDagDefine(loginUser, processDefinition);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+
+        if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+            return result;
+        }
+
+        // save dag schedule
+        Map<String, Object> scheduleResult = createDagSchedule(loginUser, projectCode, processDefinitionCode, scheduleJson);
+        if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
+            return scheduleResult;
+        }
+        return result;
+    }
+
+    private Map<String, Object> createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) {
+        Map<String, Object> result = new HashMap<>();
+        int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
+        if (insertVersion == 0) {
+            putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
+        }
+        putMsg(result, Status.SUCCESS);
+        result.put(Constants.DATA_LIST, processDefinition);
+        return result;
+    }
+
+    private Map<String, Object> createDagSchedule(User loginUser,
+                                                  long projectCode,
+                                                  long processDefinitionCode,
+                                                  String scheduleJson) {
+        Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
+        // set default value
+        FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
+        WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
+        Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
+        int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
+        String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
+        Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
+
+        ScheduleParam param = new ScheduleParam();
+        param.setStartTime(schedule.getStartTime());
+        param.setEndTime(schedule.getEndTime());
+        param.setCrontab(schedule.getCrontab());
+        param.setTimezoneId(schedule.getTimezoneId());
+
+        return schedulerService.insertSchedule(
+                loginUser,
+                projectCode,
+                processDefinitionCode,
+                JSONUtils.toJsonString(param),
+                warningType,
+                warningGroupId,
+                failureStrategy,
+                processInstancePriority,
+                workerGroup,
+                environmentCode);
     }
 
     /**
@@ -1571,7 +1667,111 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                                 String tenantCode,
                                                                 String scheduleJson,
                                                                 ProcessExecutionTypeEnum executionType) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+
+        int tenantId = -1;
+        if (!Constants.DEFAULT.equals(tenantCode)) {
+            Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+            if (tenant == null) {
+                putMsg(result, Status.TENANT_NOT_EXIST);
+                return result;
+            }
+            tenantId = tenant.getId();
+        }
+
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
+        // check process definition exists
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
+            return result;
+        }
+        if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+            // online can not permit edit
+            putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
+            return result;
+        }
+        if (!name.equals(processDefinition.getName())) {
+            // check whether the new process define name exist
+            ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
+            if (definition != null) {
+                putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
+                return result;
+            }
+        }
+        ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
+        processDefinition.set(projectCode, name, description, globalParams, "", timeout, tenantId);
+        processDefinition.setExecutionType(executionType);
+        result = updateDagDefineBasicInfo(loginUser, processDefinition, processDefinitionDeepCopy);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+
+        if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+            return result;
+        }
+        // update dag schedule
+        Map<String, Object> scheduleResult = updateDagSchedule(loginUser, projectCode, code, scheduleJson);
+        if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
+            return scheduleResult;
+        }
+        return result;
+    }
+
+    private Map<String, Object> updateDagDefineBasicInfo(User loginUser,
+                                                ProcessDefinition processDefinition,
+                                                ProcessDefinition processDefinitionDeepCopy) {
+        Map<String, Object> result = new HashMap<>();
+        int insertVersion;
+        if (processDefinition.equals(processDefinitionDeepCopy)) {
+            insertVersion = processDefinitionDeepCopy.getVersion();
+        } else {
+            processDefinition.setUpdateTime(new Date());
+            insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
+        }
+        if (insertVersion == 0) {
+            putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+            throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+        }
+        putMsg(result, Status.SUCCESS);
+        result.put(Constants.DATA_LIST, processDefinition);
+        return result;
+    }
+
+    private Map<String, Object> updateDagSchedule(User loginUser,
+                                                  long projectCode,
+                                                  long processDefinitionCode,
+                                                  String scheduleJson) {
+        Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
+        // set default value
+        FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
+        WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
+        Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
+        int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
+        String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
+        Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
+
+        ScheduleParam param = new ScheduleParam();
+        param.setStartTime(schedule.getStartTime());
+        param.setEndTime(schedule.getEndTime());
+        param.setCrontab(schedule.getCrontab());
+        param.setTimezoneId(schedule.getTimezoneId());
+
+        return schedulerService.updateScheduleByProcessDefinitionCode(
+                loginUser,
+                projectCode,
+                processDefinitionCode,
+                JSONUtils.toJsonString(param),
+                warningType,
+                warningGroupId,
+                failureStrategy,
+                processInstancePriority,
+                workerGroup,
+                environmentCode);
     }
 
     /**