You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2021/11/29 02:31:27 UTC
[dolphinscheduler] branch dev updated: [Improvement][API][num-1 & 2] Implement createEmptyProcessDefinition & updateProcessDefinitionBasicInfo (#6984)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f30ee59 [Improvement][API][num-1 & 2] Implement createEmptyProcessDefinition & updateProcessDefinitionBasicInfo (#6984)
f30ee59 is described below
commit f30ee5956eb33c069bc2ebaec18b555fe342be77
Author: EdwardYang <ya...@126.com>
AuthorDate: Mon Nov 29 10:30:48 2021 +0800
[Improvement][API][num-1 & 2] Implement createEmptyProcessDefinition & updateProcessDefinitionBasicInfo (#6984)
* [feature] finish interface createEmptyProcessDefinition
* [feature] finish interface updateProcessDefinitionBasicInfo
* [feature] use class Schedule replace ScheduleDto and delete it.
* [feature] delete chinese comment.
* [Improvement] change '.*' to single form of import
Co-authored-by: 熠然 <ya...@cai-inc.com>
Co-authored-by: edward-yang <ya...@gmail.com>
---
.../service/impl/ProcessDefinitionServiceImpl.java | 204 ++++++++++++++++++++-
1 file changed, 202 insertions(+), 2 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 1276717..2a3c14d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
+import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status;
@@ -33,9 +34,12 @@ import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -1533,6 +1537,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return process definition code
*/
@Override
+ @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> createEmptyProcessDefinition(User loginUser,
long projectCode,
String name,
@@ -1542,7 +1547,98 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String tenantCode,
String scheduleJson,
ProcessExecutionTypeEnum executionType) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+
+ // check whether the new process define name exist
+ ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
+ if (definition != null) {
+ putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
+ return result;
+ }
+
+ int tenantId = -1;
+ if (!Constants.DEFAULT.equals(tenantCode)) {
+ Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+ if (tenant == null) {
+ putMsg(result, Status.TENANT_NOT_EXIST);
+ return result;
+ }
+ tenantId = tenant.getId();
+ }
+ long processDefinitionCode;
+ try {
+ processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
+ } catch (CodeGenerateException e) {
+ putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
+ return result;
+ }
+ ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
+ globalParams, "", timeout, loginUser.getId(), tenantId);
+ processDefinition.setExecutionType(executionType);
+ result = createEmptyDagDefine(loginUser, processDefinition);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+
+ if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+ return result;
+ }
+
+ // save dag schedule
+ Map<String, Object> scheduleResult = createDagSchedule(loginUser, projectCode, processDefinitionCode, scheduleJson);
+ if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
+ return scheduleResult;
+ }
+ return result;
+ }
+
+ private Map<String, Object> createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) {
+ Map<String, Object> result = new HashMap<>();
+ int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
+ if (insertVersion == 0) {
+ putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
+ throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
+ }
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, processDefinition);
+ return result;
+ }
+
+ private Map<String, Object> createDagSchedule(User loginUser,
+ long projectCode,
+ long processDefinitionCode,
+ String scheduleJson) {
+ Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
+ // set default value
+ FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
+ WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
+ Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
+ int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
+ String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
+ Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
+
+ ScheduleParam param = new ScheduleParam();
+ param.setStartTime(schedule.getStartTime());
+ param.setEndTime(schedule.getEndTime());
+ param.setCrontab(schedule.getCrontab());
+ param.setTimezoneId(schedule.getTimezoneId());
+
+ return schedulerService.insertSchedule(
+ loginUser,
+ projectCode,
+ processDefinitionCode,
+ JSONUtils.toJsonString(param),
+ warningType,
+ warningGroupId,
+ failureStrategy,
+ processInstancePriority,
+ workerGroup,
+ environmentCode);
}
/**
@@ -1571,7 +1667,111 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String tenantCode,
String scheduleJson,
ProcessExecutionTypeEnum executionType) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+
+ int tenantId = -1;
+ if (!Constants.DEFAULT.equals(tenantCode)) {
+ Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+ if (tenant == null) {
+ putMsg(result, Status.TENANT_NOT_EXIST);
+ return result;
+ }
+ tenantId = tenant.getId();
+ }
+
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
+ // check process definition exists
+ if (processDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
+ return result;
+ }
+ if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+ // online can not permit edit
+ putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
+ return result;
+ }
+ if (!name.equals(processDefinition.getName())) {
+ // check whether the new process define name exist
+ ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
+ if (definition != null) {
+ putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
+ return result;
+ }
+ }
+ ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
+ processDefinition.set(projectCode, name, description, globalParams, "", timeout, tenantId);
+ processDefinition.setExecutionType(executionType);
+ result = updateDagDefineBasicInfo(loginUser, processDefinition, processDefinitionDeepCopy);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+
+ if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+ return result;
+ }
+ // update dag schedule
+ Map<String, Object> scheduleResult = updateDagSchedule(loginUser, projectCode, code, scheduleJson);
+ if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
+ return scheduleResult;
+ }
+ return result;
+ }
+
+ private Map<String, Object> updateDagDefineBasicInfo(User loginUser,
+ ProcessDefinition processDefinition,
+ ProcessDefinition processDefinitionDeepCopy) {
+ Map<String, Object> result = new HashMap<>();
+ int insertVersion;
+ if (processDefinition.equals(processDefinitionDeepCopy)) {
+ insertVersion = processDefinitionDeepCopy.getVersion();
+ } else {
+ processDefinition.setUpdateTime(new Date());
+ insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
+ }
+ if (insertVersion == 0) {
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ }
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, processDefinition);
+ return result;
+ }
+
+ private Map<String, Object> updateDagSchedule(User loginUser,
+ long projectCode,
+ long processDefinitionCode,
+ String scheduleJson) {
+ Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
+ // set default value
+ FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
+ WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
+ Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
+ int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
+ String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
+ Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
+
+ ScheduleParam param = new ScheduleParam();
+ param.setStartTime(schedule.getStartTime());
+ param.setEndTime(schedule.getEndTime());
+ param.setCrontab(schedule.getCrontab());
+ param.setTimezoneId(schedule.getTimezoneId());
+
+ return schedulerService.updateScheduleByProcessDefinitionCode(
+ loginUser,
+ projectCode,
+ processDefinitionCode,
+ JSONUtils.toJsonString(param),
+ warningType,
+ warningGroupId,
+ failureStrategy,
+ processInstancePriority,
+ workerGroup,
+ environmentCode);
}
/**