You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/01/28 02:38:43 UTC
[incubator-dolphinscheduler] branch json_split updated:
[Feature][JsonSplit] modify processdefinition create/delete method (#4579)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new e1e48d7 [Feature][JsonSplit] modify processdefinition create/delete method (#4579)
e1e48d7 is described below
commit e1e48d7e03d3ad2c634f9ac0d1e2fc9184947c45
Author: Simon <zh...@cvte.com>
AuthorDate: Wed Jan 27 20:38:32 2021 -0600
[Feature][JsonSplit] modify processdefinition create/delete method (#4579)
* processdefinition create/delete method
* init
* add relation parse
* delete process_definition_json
---
.../service/impl/ProcessDefinitionServiceImpl.java | 112 ++++++++++++++++-----
.../dao/mapper/ProcessDefinitionLogMapper.java | 51 ++++++++++
.../dao/mapper/ProcessDefinitionMapper.java | 16 +++
.../dao/mapper/ProcessDefinitionLogMapper.xml | 48 +++++++++
.../dao/mapper/ProcessDefinitionMapper.xml | 23 ++++-
5 files changed, 221 insertions(+), 29 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 9810674..c40a2cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -54,17 +56,21 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StreamUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionVersion;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@@ -78,6 +84,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -132,6 +139,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
private ProcessDefinitionVersionService processDefinitionVersionService;
@Autowired
+ private TaskDefinitionService taskDefinitionService;
+
+ @Autowired
+ private ProcessTaskRelationService processTaskRelationService;
+
+ @Autowired
+ private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
+ @Autowired
private ProcessDefinitionMapper processDefineMapper;
@Autowired
@@ -159,6 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
* @return create result code
*/
@Override
+ @Transactional(rollbackFor = Exception.class)
public Map<String, Object> createProcessDefinition(User loginUser,
String projectName,
String name,
@@ -176,7 +193,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult;
}
- ProcessDefinition processDefine = new ProcessDefinition();
+ ProcessDefinition processDefinition = new ProcessDefinition();
Date now = new Date();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
@@ -185,42 +202,84 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkProcessJson;
}
- processDefine.setName(name);
- processDefine.setReleaseState(ReleaseState.OFFLINE);
- processDefine.setProjectId(project.getId());
- processDefine.setUserId(loginUser.getId());
- processDefine.setProcessDefinitionJson(processDefinitionJson);
- processDefine.setDescription(desc);
- processDefine.setLocations(locations);
- processDefine.setConnects(connects);
- processDefine.setTimeout(processData.getTimeout());
- processDefine.setTenantId(processData.getTenantId());
- processDefine.setModifyBy(loginUser.getUserName());
- processDefine.setResourceIds(getResourceIds(processData));
+ Long processDefinitionCode;
+ try {
+ processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
+ processDefinition.setCode(processDefinitionCode);
+ } catch (SnowFlakeException e) {
+ putMsg(result, Status.CREATE_PROCESS_DEFINITION);
+ return result;
+ }
+
+ processDefinition.setName(name);
+ processDefinition.setReleaseState(ReleaseState.OFFLINE);
+ processDefinition.setUserId(loginUser.getId());
+ processDefinition.setDescription(desc);
+ processDefinition.setLocations(locations);
+ processDefinition.setConnects(connects);
+ processDefinition.setTimeout(processData.getTimeout());
+ processDefinition.setTenantId(processData.getTenantId());
+ processDefinition.setModifyBy(loginUser.getUserName());
+ processDefinition.setResourceIds(getResourceIds(processData));
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
if (CollectionUtils.isNotEmpty(globalParamsList)) {
Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
globalParamsList = new ArrayList<>(globalParamsSet);
- processDefine.setGlobalParamList(globalParamsList);
+ processDefinition.setGlobalParamList(globalParamsList);
}
- processDefine.setCreateTime(now);
- processDefine.setUpdateTime(now);
- processDefine.setFlag(Flag.YES);
+ processDefinition.setCreateTime(now);
+ processDefinition.setUpdateTime(now);
+ processDefinition.setFlag(Flag.YES);
// save the new process definition
- processDefineMapper.insert(processDefine);
+ processDefineMapper.insert(processDefinition);
- // add process definition version
- long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefine);
+ // parse and save the taskDefinition and processTaskRelation
+ try {
+ List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
- processDefine.setVersion(version);
+ for (TaskNode task : taskNodeList) {
+ taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
+ }
+
+ DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
+ Collection<String> beginNode = dag.getBeginNode();
+ Collection<String> endNode = dag.getEndNode();
+
+ // TODO: query taskCode by projectCode and taskName
- processDefineMapper.updateVersionByProcessDefinitionId(processDefine.getId(), version);
+ processTaskRelationService.createProcessTaskRelation(
+ loginUser,
+ name,
+ project.getCode(),
+ processDefinitionCode,
+ 0L,
+ 0L,
+ "0",
+ "");
+
+ } catch (Exception e) {
+ putMsg(result, Status.CREATE_PROCESS_DEFINITION);
+ return result;
+ }
+
+ // save process definition log
+ ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
+ JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
+
+ processDefinitionLog.setOperator(loginUser.getId());
+ processDefinitionLog.setOperateTime(now);
+ processDefinitionLogMapper.insert(processDefinitionLog);
+
+ // add process definition version
+ long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
+ processDefinition.setVersion(version);
+ processDefineMapper.updateVersionByProcessDefinitionId(processDefinition.getId(), version);
// return processDefinition object with ID
- result.put(Constants.DATA_LIST, processDefine.getId());
+ result.put(Constants.DATA_LIST, processDefinition.getId());
putMsg(result, Status.SUCCESS);
return result;
}
@@ -367,7 +426,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult;
}
- ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(),processDefinitionName);
+ ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), processDefinitionName);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
} else {
@@ -523,6 +582,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId);
+ // TODO: replace id to code
+ // ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode);
+
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId);
return result;
@@ -562,6 +624,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
}
}
+ // TODO: replace id to code
+ // ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode);
int delete = processDefineMapper.deleteById(processDefinitionId);
if (delete > 0) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
new file mode 100644
index 0000000..dcf99a2
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ * process definition log mapper interface
+ */
+public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {
+
+ /**
+ * query process definition log by name
+ *
+ * @param projectCode projectCode
+ * @param name process name
+ * @return process definition log list
+ */
+ List<ProcessDefinitionLog> queryByDefinitionName(@Param("projectCode") Long projectCode,
+ @Param("processDefinitionName") String name);
+
+ /**
+ * query process definition log list
+ *
+ * @param processDefinitionCode processDefinitionCode
+ * @return process definition log list
+ */
+ List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index 36c9887..4c61d68 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -35,6 +35,22 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
/**
+ * query process definition by code
+ *
+ * @param code code
+ * @return process definition
+ */
+ ProcessDefinition queryByCode(@Param("code") Long code);
+
+ /**
+ * delete process definition by code
+ *
+ * @param code code
+ * @return delete result
+ */
+ int deleteByCode(@Param("code") Long code);
+
+ /**
* verify process definition by name
*
* @param projectId projectId
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
new file mode 100644
index 0000000..e722cf5
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper">
+
+ <sql id="baseSql">
+ pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
+ pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects,
+ pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time,
+ pd.update_time, u.user_name,p.name as project_name
+ </sql>
+
+ <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_definition_log pd
+ JOIN t_ds_user u ON pd.user_id = u.id
+ JOIN t_ds_project p ON pd.project_code = p.code
+ WHERE p.code = #{projectCode}
+ and pd.name = #{processDefinitionName}
+ </select>
+
+ <select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_definition_log pd
+ JOIN t_ds_user u ON pd.user_id = u.id
+ JOIN t_ds_project p ON pd.project_code = p.code
+ WHERE pd.code = #{processDefinitionCode}
+ </select>
+
+</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index a24fd29..e9a4888 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -19,21 +19,34 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper">
<sql id="baseSql">
- id
- , name, version, release_state, project_id, user_id, process_definition_json, description,
+ id, code, name, version, release_state, project_id, project_code, user_id, description,
global_params, flag, locations, connects, warning_group_id, create_time, timeout,
tenant_id, update_time, modify_by, resource_ids
</sql>
+
<select id="verifyByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
- select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description,
+ select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids
from t_ds_process_definition pd
WHERE pd.project_id = #{projectId}
and pd.name = #{processDefinitionName}
</select>
+
+ <delete id="deleteByCode">
+ delete from t_ds_process_definition
+ where code = #{code}
+ </delete>
+
+ <select id="queryByCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_definition
+ where code = #{code}
+ </select>
+
<select id="queryByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
- select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description,
+ select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids,
u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name
@@ -101,7 +114,7 @@
</select>
<select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT
- pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description,
+ pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids,
u.user_name,