You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/01/25 02:44:46 UTC

[incubator-dolphinscheduler] branch json_split updated: [Feature][JsonSplit] add task and relation save code (#4546)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new 648bc41  [Feature][JsonSplit]  add task and relation save code (#4546)
648bc41 is described below

commit 648bc412a1b5f488b61a18ffa321a00cef4c74c7
Author: JinyLeeChina <42...@users.noreply.github.com>
AuthorDate: Mon Jan 25 10:44:15 2021 +0800

    [Feature][JsonSplit]  add task and relation save code (#4546)
    
    * add task and relation save code
    
    * modify codestyle
    
    Co-authored-by: JinyLeeChina <29...@qq.com>
---
 .../api/service/ProcessTaskRelationService.java    |  52 ++++++
 .../api/service/TaskDefinitionService.java         |  47 +++---
 .../impl/ProcessTaskRelationServiceImpl.java       | 108 +++++++++++++
 .../service/impl/TaskDefinitionServiceImpl.java    | 180 +++++++++++++++++++++
 .../common/enums/ConditionType.java                |  18 +++
 .../dolphinscheduler/common/enums/TaskType.java    |  20 ++-
 .../dolphinscheduler/common/utils/StringUtils.java |  28 ++++
 .../dao/entity/ProcessTaskRelation.java            |  51 +++---
 .../dao/entity/ProcessTaskRelationLog.java         |  37 ++---
 .../dao/entity/TaskDefinition.java                 |  41 +++++
 .../dao/entity/TaskDefinitionLog.java              |  21 +++
 .../dao/mapper/ProcessTaskRelationLogMapper.java   |  34 +---
 .../dao/mapper/ProcessTaskRelationMapper.java      |  43 ++---
 .../dao/mapper/ProcessTaskRelationLogMapper.xml    |  22 +++
 .../dao/mapper/ProcessTaskRelationMapper.xml       |  31 ++++
 sql/dolphinscheduler-postgre.sql                   |   1 +
 16 files changed, 599 insertions(+), 135 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
new file mode 100644
index 0000000..56ff742
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service;
+
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.Map;
+
+/**
+ * process task relation service
+ */
+public interface ProcessTaskRelationService {
+
+    /**
+     * create process task relation
+     *
+     * @param loginUser login user
+     * @param name relation name
+     * @param projectCode process code
+     * @param processDefinitionCode process definition code
+     * @param preTaskCode pre task code
+     * @param postTaskCode post task code
+     * @param conditionType condition type
+     * @param conditionParams condition params
+     * @return create result code
+     */
+    Map<String, Object> createProcessTaskRelation(User loginUser,
+                                                  String name,
+                                                  Long projectCode,
+                                                  Long processDefinitionCode,
+                                                  Long preTaskCode,
+                                                  Long postTaskCode,
+                                                  String conditionType,
+                                                  String conditionParams);
+
+}
+
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
similarity index 50%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
index 3842aed..4f96001 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
@@ -15,37 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.api.service;
 
-import com.baomidou.mybatisplus.annotation.EnumValue;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
 
 /**
- * condition type
+ * task definition service
  */
-public enum ConditionType {
+public interface TaskDefinitionService {
+
     /**
-     * 0 none
-     * 1 judge
-     * 2 delay
+     * create task definition
+     *
+     * @param loginUser login user
+     * @param projectName project name
+     * @param taskDefinitionJson task definition json
+     * @throws JsonProcessingException JsonProcessingException
      */
-    NONE(0, "none"),
-    JUDGE(1, "judge"),
-    DELAY(2, "delay");
-
-    ConditionType(int code, String desc) {
-        this.code = code;
-        this.desc = desc;
-    }
-
-    @EnumValue
-    private final int code;
-    private final String desc;
-
-    public int getCode() {
-        return code;
-    }
-
-    public String getDesc() {
-        return desc;
-    }
+    Map<String, Object> createTaskDefinition(User loginUser,
+                                             String projectName,
+                                             String taskDefinitionJson) throws JsonProcessingException, SnowFlakeException;
 }
+
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
new file mode 100644
index 0000000..8b63a82
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * task definition service impl
+ */
+@Service
+public class ProcessTaskRelationServiceImpl extends BaseService implements
+        ProcessTaskRelationService {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class);
+
+    //@Autowired
+    //private ProjectMapper projectMapper;
+
+    @Autowired
+    private ProcessTaskRelationMapper processTaskRelationMapper;
+
+    @Autowired
+    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+    /**
+     * create process task relation
+     *
+     * @param loginUser login user
+     * @param name relation name
+     * @param projectCode process code
+     * @param processDefinitionCode process definition code
+     * @param preTaskCode pre task code
+     * @param postTaskCode post task code
+     * @param conditionType condition type
+     * @param conditionParams condition params
+     * @return create result code
+     */
+    @Transactional
+    @Override
+    public Map<String, Object> createProcessTaskRelation(User loginUser,
+                                                         String name,
+                                                         Long projectCode,
+                                                         Long processDefinitionCode,
+                                                         Long preTaskCode,
+                                                         Long postTaskCode,
+                                                         String conditionType,
+                                                         String conditionParams) {
+        Map<String, Object> result = new HashMap<>();
+        // TODO check projectCode
+        // TODO check processDefinitionCode
+        // TODO check preTaskCode and postTaskCode
+        Date now = new Date();
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(name,
+                1,
+                projectCode,
+                processDefinitionCode,
+                preTaskCode,
+                postTaskCode,
+                ConditionType.of(conditionType),
+                conditionParams,
+                now,
+                now);
+        // save process task relation
+        processTaskRelationMapper.insert(processTaskRelation);
+        // save process task relation log
+        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
+        processTaskRelationLog.set(processTaskRelation);
+        processTaskRelationLog.setOperator(loginUser.getId());
+        processTaskRelationLog.setOperateTime(now);
+        processTaskRelationLogMapper.insert(processTaskRelationLog);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+}
+
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
new file mode 100644
index 0000000..4193cb0
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
+import org.apache.dolphinscheduler.api.utils.CheckUtils;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * task definition service impl
+ */
+@Service
+public class TaskDefinitionServiceImpl extends BaseService implements
+        TaskDefinitionService {
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskDefinitionServiceImpl.class);
+
+    @Autowired
+    private ProjectMapper projectMapper;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+
+    /**
+     * create task definition
+     *
+     * @param loginUser login user
+     * @param projectName project name
+     * @param taskDefinitionJson task definition json
+     */
+    @Transactional
+    @Override
+    public Map<String, Object> createTaskDefinition(User loginUser,
+                                                    String projectName,
+                                                    String taskDefinitionJson) {
+
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByName(projectName);
+        // check project auth
+        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
+        Status resultStatus = (Status) checkResult.get(Constants.STATUS);
+        if (resultStatus != Status.SUCCESS) {
+            return checkResult;
+        }
+
+        TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
+        if (taskNode == null) {
+            logger.error("taskDefinitionJson is not valid json");
+            putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+            return result;
+        }
+        if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getName())) {
+            logger.error("task node {} parameter invalid", taskNode.getName());
+            putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
+            return result;
+        }
+        long code = 0L;
+        try {
+            code = SnowFlakeUtils.getInstance().nextId();
+        } catch (SnowFlakeException e) {
+            logger.error("Task code get error, ", e);
+        }
+        if (code == 0L) {
+            putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);// TODO code message
+            return result;
+        }
+        Date now = new Date();
+        TaskDefinition taskDefinition = new TaskDefinition(code,
+                taskNode.getName(),
+                1,
+                taskNode.getDesc(),
+                0L, // TODO  project.getCode()
+                loginUser.getId(),
+                TaskType.of(taskNode.getType()),
+                taskNode.getParams(),
+                taskNode.isForbidden() ? Flag.NO : Flag.YES, taskNode.getTaskInstancePriority(),
+                taskNode.getWorkerGroup(), taskNode.getMaxRetryTimes(),
+                taskNode.getRetryInterval(),
+                taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE,
+                taskNode.getTaskTimeoutParameter().getStrategy(),
+                taskNode.getTaskTimeoutParameter().getInterval(),
+                now,
+                now);
+        taskDefinition.setResourceIds(getResourceIds(taskDefinition));
+        // save the new task definition
+        taskDefinitionMapper.insert(taskDefinition);
+        // save task definition log
+        TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+        taskDefinitionLog.set(taskDefinition);
+        taskDefinitionLog.setOperator(loginUser.getId());
+        taskDefinitionLog.setOperateTime(now);
+        taskDefinitionLogMapper.insert(taskDefinitionLog);
+        // return taskDefinition object with code
+        result.put(Constants.DATA_LIST, code);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    /**
+     * get resource ids
+     *
+     * @param taskDefinition taskDefinition
+     * @return resource ids
+     */
+    private String getResourceIds(TaskDefinition taskDefinition) {
+        Set<Integer> resourceIds = null;
+        // TODO modify taskDefinition.getTaskType()
+        AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams());
+
+        if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
+            resourceIds = params.getResourceFilesList().
+                    stream()
+                    .filter(t -> t.getId() != 0)
+                    .map(ResourceInfo::getId)
+                    .collect(Collectors.toSet());
+        }
+        if (CollectionUtils.isEmpty(resourceIds)) {
+            return StringUtils.EMPTY;
+        }
+        return StringUtils.join(resourceIds, ",");
+    }
+
+}
+
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
index 3842aed..a80f1d3 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.common.enums;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.baomidou.mybatisplus.annotation.EnumValue;
 
 /**
@@ -48,4 +51,19 @@ public enum ConditionType {
     public String getDesc() {
         return desc;
     }
+
+    private static final Map<String, ConditionType> CONDITION_TYPE_MAP = new HashMap<>();
+
+    static {
+        for (ConditionType conditionType : ConditionType.values()) {
+            CONDITION_TYPE_MAP.put(conditionType.desc, conditionType);
+        }
+    }
+
+    public static ConditionType of(String desc) {
+        if (CONDITION_TYPE_MAP.containsKey(desc)) {
+            return CONDITION_TYPE_MAP.get(desc);
+        }
+        throw new IllegalArgumentException("invalid type : " + desc);
+    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
index ae4b94b..dc3e9a5 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
@@ -16,6 +16,9 @@
  */
 package org.apache.dolphinscheduler.common.enums;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.baomidou.mybatisplus.annotation.EnumValue;
 
 /**
@@ -53,7 +56,7 @@ public enum TaskType {
     SQOOP(12, "sqoop"),
     WATERDROP(13, "waterdrop");
 
-    TaskType(int code, String descp){
+    TaskType(int code, String descp) {
         this.code = code;
         this.descp = descp;
     }
@@ -74,4 +77,19 @@ public enum TaskType {
     public String getDescp() {
         return descp;
     }
+
+    private static final Map<String, TaskType> TASK_TYPE_MAP = new HashMap<>();
+
+    static {
+        for (TaskType taskType : TaskType.values()) {
+            TASK_TYPE_MAP.put(taskType.descp, taskType);
+        }
+    }
+
+    public static TaskType of(String descp) {
+        if (TASK_TYPE_MAP.containsKey(descp)) {
+            return TASK_TYPE_MAP.get(descp);
+        }
+        throw new IllegalArgumentException("invalid type : " + descp);
+    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
index 6e32d12..c0df6e6 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.common.utils;
 
+import java.util.Iterator;
+
 public class StringUtils {
 
     public static final String EMPTY = "";
@@ -61,4 +63,30 @@ public class StringUtils {
     public static String trim(String str) {
         return str == null ? null : str.trim();
     }
+
+    public static String join(final Iterable<?> iterable, final String separator) {
+        if (iterable == null) {
+            return null;
+        }
+        Iterator<?> iterator = iterable.iterator();
+        if (!iterator.hasNext()) {
+            return EMPTY;
+        }
+        final Object first = iterator.next();
+        if (!iterator.hasNext()) {
+            return String.valueOf(first);
+        }
+        // two or more elements
+        final StringBuilder buf = new StringBuilder(256);
+        while (iterator.hasNext()) {
+            if (separator != null) {
+                buf.append(separator);
+            }
+            final Object obj = iterator.next();
+            if (obj != null) {
+                buf.append(obj);
+            }
+        }
+        return buf.toString();
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index 6d1dd77..3f8b256 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -66,21 +66,11 @@ public class ProcessTaskRelation {
     private long processDefinitionCode;
 
     /**
-     * pre project code
-     */
-    private long preProjectCode;
-
-    /**
      * pre task code
      */
     private long preTaskCode;
 
     /**
-     * post project code
-     */
-    private long postProjectCode;
-
-    /**
      * post task code
      */
     private long postTaskCode;
@@ -119,6 +109,31 @@ public class ProcessTaskRelation {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
     private Date updateTime;
 
+    public ProcessTaskRelation() {
+    }
+
+    public ProcessTaskRelation(String name,
+                               int version,
+                               long projectCode,
+                               long processDefinitionCode,
+                               long preTaskCode,
+                               long postTaskCode,
+                               ConditionType conditionType,
+                               String conditionParams,
+                               Date createTime,
+                               Date updateTime) {
+        this.name = name;
+        this.version = version;
+        this.projectCode = projectCode;
+        this.processDefinitionCode = processDefinitionCode;
+        this.preTaskCode = preTaskCode;
+        this.postTaskCode = postTaskCode;
+        this.conditionType = conditionType;
+        this.conditionParams = conditionParams;
+        this.createTime = createTime;
+        this.updateTime = updateTime;
+    }
+
     public String getName() {
         return name;
     }
@@ -210,14 +225,6 @@ public class ProcessTaskRelation {
         this.processDefinitionCode = processDefinitionCode;
     }
 
-    public long getPreProjectCode() {
-        return preProjectCode;
-    }
-
-    public void setPreProjectCode(long preProjectCode) {
-        this.preProjectCode = preProjectCode;
-    }
-
     public long getPreTaskCode() {
         return preTaskCode;
     }
@@ -226,14 +233,6 @@ public class ProcessTaskRelation {
         this.preTaskCode = preTaskCode;
     }
 
-    public long getPostProjectCode() {
-        return postProjectCode;
-    }
-
-    public void setPostProjectCode(long postProjectCode) {
-        this.postProjectCode = postProjectCode;
-    }
-
     public long getPostTaskCode() {
         return postTaskCode;
     }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
index 27f156c..e25e2df 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
@@ -66,21 +66,11 @@ public class ProcessTaskRelationLog {
     private long processDefinitionCode;
 
     /**
-     * pre project code
-     */
-    private long preProjectCode;
-
-    /**
      * pre task code
      */
     private long preTaskCode;
 
     /**
-     * post project code
-     */
-    private long postProjectCode;
-
-    /**
      * post task code
      */
     private long postTaskCode;
@@ -221,14 +211,6 @@ public class ProcessTaskRelationLog {
         this.processDefinitionCode = processDefinitionCode;
     }
 
-    public long getPreProjectCode() {
-        return preProjectCode;
-    }
-
-    public void setPreProjectCode(long preProjectCode) {
-        this.preProjectCode = preProjectCode;
-    }
-
     public long getPreTaskCode() {
         return preTaskCode;
     }
@@ -237,14 +219,6 @@ public class ProcessTaskRelationLog {
         this.preTaskCode = preTaskCode;
     }
 
-    public long getPostProjectCode() {
-        return postProjectCode;
-    }
-
-    public void setPostProjectCode(long postProjectCode) {
-        this.postProjectCode = postProjectCode;
-    }
-
     public long getPostTaskCode() {
         return postTaskCode;
     }
@@ -276,4 +250,15 @@ public class ProcessTaskRelationLog {
     public void setOperateTime(Date operateTime) {
         this.operateTime = operateTime;
     }
+
+    public void set(ProcessTaskRelation processTaskRelation) {
+        this.name = processTaskRelation.getName();
+        this.version = processTaskRelation.getVersion();
+        this.projectCode = processTaskRelation.getProjectCode();
+        this.processDefinitionCode = processTaskRelation.getProcessDefinitionCode();
+        this.preTaskCode = processTaskRelation.getPreTaskCode();
+        this.postTaskCode = processTaskRelation.getPostTaskCode();
+        this.conditionType = processTaskRelation.getConditionType();
+        this.conditionParams = processTaskRelation.getConditionParams();
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 1158aae..a7482cb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -167,6 +167,47 @@ public class TaskDefinition {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
     private Date updateTime;
 
+    public TaskDefinition() {
+    }
+
+    public TaskDefinition(long code,
+                          String name,
+                          int version,
+                          String description,
+                          long projectCode,
+                          int userId,
+                          TaskType taskType,
+                          String taskParams,
+                          Flag flag,
+                          Priority taskPriority,
+                          String workerGroup,
+                          int failRetryTimes,
+                          int failRetryInterval,
+                          TimeoutFlag timeoutFlag,
+                          TaskTimeoutStrategy taskTimeoutStrategy,
+                          int timeout,
+                          Date createTime,
+                          Date updateTime) {
+        this.code = code;
+        this.name = name;
+        this.version = version;
+        this.description = description;
+        this.projectCode = projectCode;
+        this.userId = userId;
+        this.taskType = taskType;
+        this.taskParams = taskParams;
+        this.flag = flag;
+        this.taskPriority = taskPriority;
+        this.workerGroup = workerGroup;
+        this.failRetryTimes = failRetryTimes;
+        this.failRetryInterval = failRetryInterval;
+        this.timeoutFlag = timeoutFlag;
+        this.taskTimeoutStrategy = taskTimeoutStrategy;
+        this.timeout = timeout;
+        this.createTime = createTime;
+        this.updateTime = updateTime;
+    }
+
     public String getName() {
         return name;
     }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
index 1791935..439e352 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
@@ -395,4 +395,25 @@ public class TaskDefinitionLog {
     public void setResourceIds(String resourceIds) {
         this.resourceIds = resourceIds;
     }
+
+    public void set(TaskDefinition taskDefinition) {
+        this.code = taskDefinition.getCode();
+        this.name = taskDefinition.getName();
+        this.version = taskDefinition.getVersion();
+        this.description = taskDefinition.getDescription();
+        this.projectCode = taskDefinition.getProjectCode();
+        this.userId = taskDefinition.getUserId();
+        this.taskType = taskDefinition.getTaskType();
+        this.taskParams = taskDefinition.getTaskParams();
+        this.flag = taskDefinition.getFlag();
+        this.taskPriority = taskDefinition.getTaskPriority();
+        this.workerGroup = taskDefinition.getWorkerGroup();
+        this.failRetryTimes = taskDefinition.getFailRetryTimes();
+        this.failRetryInterval = taskDefinition.getFailRetryInterval();
+        this.timeoutFlag = taskDefinition.getTimeoutFlag();
+        this.taskTimeoutStrategy = taskDefinition.getTaskTimeoutStrategy();
+        this.timeout = taskDefinition.getTimeout();
+        this.createTime = taskDefinition.getCreateTime();
+        this.updateTime = taskDefinition.getUpdateTime();
+    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
similarity index 59%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
copy to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
index 3842aed..d2b5676 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
@@ -15,37 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.dao.mapper;
 
-import com.baomidou.mybatisplus.annotation.EnumValue;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
 /**
- * condition type
+ * process task relation log mapper interface
  */
-public enum ConditionType {
-    /**
-     * 0 none
-     * 1 judge
-     * 2 delay
-     */
-    NONE(0, "none"),
-    JUDGE(1, "judge"),
-    DELAY(2, "delay");
-
-    ConditionType(int code, String desc) {
-        this.code = code;
-        this.desc = desc;
-    }
-
-    @EnumValue
-    private final int code;
-    private final String desc;
-
-    public int getCode() {
-        return code;
-    }
+public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRelationLog> {
 
-    public String getDesc() {
-        return desc;
-    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
similarity index 56%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
copy to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 3842aed..6fb6f9f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ConditionType.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -15,37 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.dao.mapper;
 
-import com.baomidou.mybatisplus.annotation.EnumValue;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 
-/**
- * condition type
- */
-public enum ConditionType {
-    /**
-     * 0 none
-     * 1 judge
-     * 2 delay
-     */
-    NONE(0, "none"),
-    JUDGE(1, "judge"),
-    DELAY(2, "delay");
+import org.apache.ibatis.annotations.Param;
 
-    ConditionType(int code, String desc) {
-        this.code = code;
-        this.desc = desc;
-    }
+import java.util.List;
 
-    @EnumValue
-    private final int code;
-    private final String desc;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
-    public int getCode() {
-        return code;
-    }
+/**
+ * process task relation mapper interface
+ */
+public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelation> {
 
-    public String getDesc() {
-        return desc;
-    }
+    /**
+     * process task relation by processDefinitionCode
+     *
+     * @param processDefinitionCode processDefinitionCode
+     * @return task definition
+     */
+    List<ProcessTaskRelation> queryByProcessDefinitionCode(@Param("processDefinitionCode") String processDefinitionCode);
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
new file mode 100644
index 0000000..94990d4
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper">
+
+</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
new file mode 100644
index 0000000..e6192cb
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper">
+    <sql id="baseSql">
+        id, `name`, version, project_code, process_definition_code, pre_project_code, pre_task_code, post_project_code,
+        post_task_code, condition_type, condition_params, create_time, update_time
+    </sql>
+    <select id="queryByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation
+        WHERE process_definition_code = #{processDefinitionCode}
+    </select>
+</mapper>
diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql
index 201778e..430f03a 100644
--- a/sql/dolphinscheduler-postgre.sql
+++ b/sql/dolphinscheduler-postgre.sql
@@ -296,6 +296,7 @@ CREATE TABLE t_ds_process_definition (
   user_id int DEFAULT NULL ,
   global_params text ,
   locations text ,
+  connects text ,
   warning_group_id int4 DEFAULT NULL ,
   flag int DEFAULT NULL ,
   timeout int DEFAULT '0' ,