You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/01/28 02:38:43 UTC

[incubator-dolphinscheduler] branch json_split updated: [Feature][JsonSplit] modify processdefinition create/delete method (#4579)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new e1e48d7  [Feature][JsonSplit] modify processdefinition create/delete method (#4579)
e1e48d7 is described below

commit e1e48d7e03d3ad2c634f9ac0d1e2fc9184947c45
Author: Simon <zh...@cvte.com>
AuthorDate: Wed Jan 27 20:38:32 2021 -0600

    [Feature][JsonSplit] modify processdefinition create/delete method (#4579)
    
    * processdefinition create/delete method
    
    * init
    
    * add relation parse
    
    * delete process_definition_json
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 112 ++++++++++++++++-----
 .../dao/mapper/ProcessDefinitionLogMapper.java     |  51 ++++++++++
 .../dao/mapper/ProcessDefinitionMapper.java        |  16 +++
 .../dao/mapper/ProcessDefinitionLogMapper.xml      |  48 +++++++++
 .../dao/mapper/ProcessDefinitionMapper.xml         |  23 ++++-
 5 files changed, 221 insertions(+), 29 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 9810674..c40a2cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.api.service.BaseService;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
 import org.apache.dolphinscheduler.api.utils.CheckUtils;
 import org.apache.dolphinscheduler.api.utils.FileUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -54,17 +56,21 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.common.utils.StreamUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionVersion;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@@ -78,6 +84,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -132,6 +139,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     private ProcessDefinitionVersionService processDefinitionVersionService;
 
     @Autowired
+    private TaskDefinitionService taskDefinitionService;
+
+    @Autowired
+    private ProcessTaskRelationService processTaskRelationService;
+
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
+    @Autowired
     private ProcessDefinitionMapper processDefineMapper;
 
     @Autowired
@@ -159,6 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
      * @return create result code
      */
     @Override
+    @Transactional(rollbackFor = Exception.class)
     public Map<String, Object> createProcessDefinition(User loginUser,
                                                        String projectName,
                                                        String name,
@@ -176,7 +193,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
             return checkResult;
         }
 
-        ProcessDefinition processDefine = new ProcessDefinition();
+        ProcessDefinition processDefinition = new ProcessDefinition();
         Date now = new Date();
 
         ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
@@ -185,42 +202,84 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
             return checkProcessJson;
         }
 
-        processDefine.setName(name);
-        processDefine.setReleaseState(ReleaseState.OFFLINE);
-        processDefine.setProjectId(project.getId());
-        processDefine.setUserId(loginUser.getId());
-        processDefine.setProcessDefinitionJson(processDefinitionJson);
-        processDefine.setDescription(desc);
-        processDefine.setLocations(locations);
-        processDefine.setConnects(connects);
-        processDefine.setTimeout(processData.getTimeout());
-        processDefine.setTenantId(processData.getTenantId());
-        processDefine.setModifyBy(loginUser.getUserName());
-        processDefine.setResourceIds(getResourceIds(processData));
+        Long processDefinitionCode;
+        try {
+            processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
+            processDefinition.setCode(processDefinitionCode);
+        } catch (SnowFlakeException e) {
+            putMsg(result, Status.CREATE_PROCESS_DEFINITION);
+            return result;
+        }
+
+        processDefinition.setName(name);
+        processDefinition.setReleaseState(ReleaseState.OFFLINE);
+        processDefinition.setUserId(loginUser.getId());
+        processDefinition.setDescription(desc);
+        processDefinition.setLocations(locations);
+        processDefinition.setConnects(connects);
+        processDefinition.setTimeout(processData.getTimeout());
+        processDefinition.setTenantId(processData.getTenantId());
+        processDefinition.setModifyBy(loginUser.getUserName());
+        processDefinition.setResourceIds(getResourceIds(processData));
 
         //custom global params
         List<Property> globalParamsList = processData.getGlobalParams();
         if (CollectionUtils.isNotEmpty(globalParamsList)) {
             Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
             globalParamsList = new ArrayList<>(globalParamsSet);
-            processDefine.setGlobalParamList(globalParamsList);
+            processDefinition.setGlobalParamList(globalParamsList);
         }
-        processDefine.setCreateTime(now);
-        processDefine.setUpdateTime(now);
-        processDefine.setFlag(Flag.YES);
+        processDefinition.setCreateTime(now);
+        processDefinition.setUpdateTime(now);
+        processDefinition.setFlag(Flag.YES);
 
         // save the new process definition
-        processDefineMapper.insert(processDefine);
+        processDefineMapper.insert(processDefinition);
 
-        // add process definition version
-        long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefine);
+        // parse and save the taskDefinition and processTaskRelation
+        try {
+            List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
 
-        processDefine.setVersion(version);
+            for (TaskNode task : taskNodeList) {
+                taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
+            }
+
+            DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
+            Collection<String> beginNode = dag.getBeginNode();
+            Collection<String> endNode = dag.getEndNode();
+
+            // TODO:  query taskCode by  projectCode and taskName
 
-        processDefineMapper.updateVersionByProcessDefinitionId(processDefine.getId(), version);
+            processTaskRelationService.createProcessTaskRelation(
+                    loginUser,
+                    name,
+                    project.getCode(),
+                    processDefinitionCode,
+                    0L,
+                    0L,
+                    "0",
+                    "");
+
+        } catch (Exception e) {
+            putMsg(result, Status.CREATE_PROCESS_DEFINITION);
+            return result;
+        }
+
+        // save process definition log
+        ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
+                JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
+
+        processDefinitionLog.setOperator(loginUser.getId());
+        processDefinitionLog.setOperateTime(now);
+        processDefinitionLogMapper.insert(processDefinitionLog);
+
+        // add process definition version
+        long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
+        processDefinition.setVersion(version);
+        processDefineMapper.updateVersionByProcessDefinitionId(processDefinition.getId(), version);
 
         // return processDefinition object with ID
-        result.put(Constants.DATA_LIST, processDefine.getId());
+        result.put(Constants.DATA_LIST, processDefinition.getId());
         putMsg(result, Status.SUCCESS);
         return result;
     }
@@ -367,7 +426,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
             return checkResult;
         }
 
-        ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(),processDefinitionName);
+        ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), processDefinitionName);
         if (processDefinition == null) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
         } else {
@@ -523,6 +582,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
 
         ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId);
 
+        // TODO: replace id to code
+        // ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode);
+
         if (processDefinition == null) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId);
             return result;
@@ -562,6 +624,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
             }
         }
 
+        // TODO: replace id to code
+        // ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode);
         int delete = processDefineMapper.deleteById(processDefinitionId);
 
         if (delete > 0) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
new file mode 100644
index 0000000..dcf99a2
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ * process definition log mapper interface
+ */
+public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {
+
+    /**
+     * query process definition log by name
+     *
+     * @param projectCode projectCode
+     * @param name process name
+     * @return process definition log list
+     */
+    List<ProcessDefinitionLog> queryByDefinitionName(@Param("projectCode") Long projectCode,
+                                        @Param("processDefinitionName") String name);
+
+    /**
+     * query process definition log list
+     *
+     * @param processDefinitionCode processDefinitionCode
+     * @return process definition log list
+     */
+    List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index 36c9887..4c61d68 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -35,6 +35,22 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
 
     /**
+     * query process definition by code
+     *
+     * @param code code
+     * @return process definition
+     */
+    ProcessDefinition queryByCode(@Param("code") Long code);
+    
+    /**
+     * delete process definition by code
+     *
+     * @param code code
+     * @return delete result
+     */
+    int deleteByCode(@Param("code") Long code);
+    
+    /**
      * verify process definition by name
      *
      * @param projectId projectId
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
new file mode 100644
index 0000000..e722cf5
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper">
+
+    <sql id="baseSql">
+       pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
+        pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects,
+         pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time,
+          pd.update_time, u.user_name,p.name as project_name
+    </sql>
+
+    <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_definition_log pd
+        JOIN t_ds_user u ON pd.user_id = u.id
+        JOIN  t_ds_project p ON pd.project_code = p.code
+        WHERE p.code = #{projectCode}
+        and pd.name = #{processDefinitionName}
+    </select>
+
+    <select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_definition_log pd
+        JOIN t_ds_user u ON pd.user_id = u.id
+        JOIN  t_ds_project p ON pd.project_code = p.code
+        WHERE pd.code = #{processDefinitionCode}
+    </select>
+
+</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index a24fd29..e9a4888 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -19,21 +19,34 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper">
     <sql id="baseSql">
-        id
-        , name, version, release_state, project_id, user_id, process_definition_json, description,
+        id, code, name, version, release_state, project_id, project_code, user_id, description,
         global_params, flag, locations, connects, warning_group_id, create_time, timeout,
         tenant_id, update_time, modify_by, resource_ids
     </sql>
+
     <select id="verifyByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
-        select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description,
+        select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
         pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
         pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids
         from t_ds_process_definition pd
         WHERE pd.project_id = #{projectId}
         and pd.name = #{processDefinitionName}
     </select>
+
+    <delete id="deleteByCode">
+        delete from t_ds_process_definition
+        where code = #{code}
+    </delete>
+
+    <select id="queryByCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_definition
+        where code = #{code}
+    </select>
+
     <select id="queryByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
-        select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description,
+        select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
         pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
         pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids,
         u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name
@@ -101,7 +114,7 @@
     </select>
     <select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
         SELECT
-            pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description,
+            pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
             pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
             pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids,
             u.user_name,