You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/08/06 07:01:35 UTC

[dolphinscheduler] branch json_split_two updated: [Feature][JsonSplit-api] update ProcessInstance api (#5953)

This is an automated email from the ASF dual-hosted git repository.

wenhemin pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split_two by this push:
     new 595205c  [Feature][JsonSplit-api] update ProcessInstance api (#5953)
595205c is described below

commit 595205cbe117dcd76bc7731eb84180f22e2b457d
Author: JinyLeeChina <42...@users.noreply.github.com>
AuthorDate: Fri Aug 6 15:01:27 2021 +0800

    [Feature][JsonSplit-api] update ProcessInstance api (#5953)
    
    * check has cycle of ProcessDefinition
    
    * checkProcessNode of processDefinition
    
    * TaskInstance api
    
    * Replace projectName with projectCode in ProcessInstance api
    
    * ProcessInstance api
    
    * ProcessInstance api
    
    Co-authored-by: JinyLeeChina <29...@qq.com>
---
 .../api/controller/ProcessInstanceController.java  |  34 +--
 .../api/service/ProcessInstanceService.java        |  20 +-
 .../service/impl/ProcessInstanceServiceImpl.java   | 118 +++++-----
 .../ProcessDefinitionControllerTest.java           |   4 +-
 .../controller/ProcessInstanceControllerTest.java  |   8 +-
 .../api/service/ProcessInstanceServiceTest.java    |  45 ++--
 .../dao/entity/ProcessDefinition.java              |  16 --
 .../dao/entity/ProcessInstance.java                |  16 +-
 .../service/process/ProcessService.java            | 253 +--------------------
 .../service/process/ProcessServiceTest.java        | 102 ---------
 sql/dolphinscheduler_postgre.sql                   |   2 -
 11 files changed, 123 insertions(+), 495 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index a3903a4..deba09a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 
 import java.io.IOException;
-import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -88,7 +87,7 @@ public class ProcessInstanceController extends BaseController {
      * @param projectCode project code
      * @param pageNo page number
      * @param pageSize page size
-     * @param processDefinitionId process definition id
+     * @param processDefineCode process definition code
      * @param searchVal search value
      * @param stateType state type
      * @param host host
@@ -98,7 +97,7 @@ public class ProcessInstanceController extends BaseController {
      */
     @ApiOperation(value = "queryProcessInstanceList", notes = "QUERY_PROCESS_INSTANCE_LIST_NOTES")
     @ApiImplicitParams({
-        @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", dataType = "Int", example = "100"),
+        @ApiImplicitParam(name = "processDefiniteCode", value = "PROCESS_DEFINITION_CODE", dataType = "Long", example = "100"),
         @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String"),
         @ApiImplicitParam(name = "executorName", value = "EXECUTOR_NAME", type = "String"),
         @ApiImplicitParam(name = "stateType", value = "EXECUTION_STATUS", type = "ExecutionStatus"),
@@ -114,7 +113,7 @@ public class ProcessInstanceController extends BaseController {
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result queryProcessInstanceList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                            @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
-                                           @RequestParam(value = "processDefinitionId", required = false, defaultValue = "0") Integer processDefinitionId,
+                                           @RequestParam(value = "processDefineCode", required = false, defaultValue = "0") long processDefineCode,
                                            @RequestParam(value = "searchVal", required = false) String searchVal,
                                            @RequestParam(value = "executorName", required = false) String executorName,
                                            @RequestParam(value = "stateType", required = false) ExecutionStatus stateType,
@@ -128,7 +127,7 @@ public class ProcessInstanceController extends BaseController {
             return returnDataListPaging(result);
         }
         searchVal = ParameterUtils.handleEscapes(searchVal);
-        result = processInstanceService.queryProcessInstanceList(loginUser, projectCode, processDefinitionId, startTime, endTime,
+        result = processInstanceService.queryProcessInstanceList(loginUser, projectCode, processDefineCode, startTime, endTime,
             searchVal, executorName, stateType, host, pageNo, pageSize);
         return returnDataListPaging(result);
     }
@@ -161,22 +160,26 @@ public class ProcessInstanceController extends BaseController {
      *
      * @param loginUser login user
      * @param projectCode project code
-     * @param processInstanceJson process instance json
+     * @param taskRelationJson process task relation json
      * @param processInstanceId process instance id
      * @param scheduleTime schedule time
      * @param syncDefine sync define
      * @param flag flag
      * @param locations locations
+     * @param tenantCode tenantCode
      * @return update result code
      */
     @ApiOperation(value = "updateProcessInstance", notes = "UPDATE_PROCESS_INSTANCE_NOTES")
     @ApiImplicitParams({
-        @ApiImplicitParam(name = "processInstanceJson", value = "PROCESS_INSTANCE_JSON", type = "String"),
+        @ApiImplicitParam(name = "taskRelationJson", value = "TASK_RELATION_JSON", type = "String"),
         @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
         @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type = "String"),
         @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean"),
+        @ApiImplicitParam(name = "globalParams", value = "PROCESS_GLOBAL_PARAMS", type = "String"),
         @ApiImplicitParam(name = "locations", value = "PROCESS_INSTANCE_LOCATIONS", type = "String"),
-        @ApiImplicitParam(name = "flag", value = "RECOVERY_PROCESS_INSTANCE_FLAG", type = "Flag"),
+        @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type = "String"),
+        @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type = "Int", example = "0"),
+        @ApiImplicitParam(name = "flag", value = "RECOVERY_PROCESS_INSTANCE_FLAG", type = "Flag")
     })
     @PostMapping(value = "/update")
     @ResponseStatus(HttpStatus.OK)
@@ -184,14 +187,17 @@ public class ProcessInstanceController extends BaseController {
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result updateProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                         @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
-                                        @RequestParam(value = "processInstanceJson", required = false) String processInstanceJson,
+                                        @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
                                         @RequestParam(value = "processInstanceId") Integer processInstanceId,
                                         @RequestParam(value = "scheduleTime", required = false) String scheduleTime,
                                         @RequestParam(value = "syncDefine", required = true) Boolean syncDefine,
+                                        @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams,
                                         @RequestParam(value = "locations", required = false) String locations,
-                                        @RequestParam(value = "flag", required = false) Flag flag) throws ParseException {
+                                        @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
+                                        @RequestParam(value = "tenantCode", required = true) String tenantCode,
+                                        @RequestParam(value = "flag", required = false) Flag flag) {
         Map<String, Object> result = processInstanceService.updateProcessInstance(loginUser, projectCode, processInstanceId,
-            processInstanceJson, scheduleTime, syncDefine, flag, locations);
+            taskRelationJson, scheduleTime, syncDefine, flag, globalParams, locations, timeout, tenantCode);
         return returnDataList(result);
     }
 
@@ -279,9 +285,9 @@ public class ProcessInstanceController extends BaseController {
      * @param taskId task id
      * @return sub process instance detail
      */
-    @ApiOperation(value = "querySubProcessInstanceByTaskId", notes = "QUERY_SUBPROCESS_INSTANCE_BY_TASK_ID_NOTES")
+    @ApiOperation(value = "querySubProcessInstanceByTaskCode", notes = "QUERY_SUBPROCESS_INSTANCE_BY_TASK_CODE_NOTES")
     @ApiImplicitParams({
-        @ApiImplicitParam(name = "taskId", value = "TASK_ID", required = true, dataType = "Int", example = "100")
+        @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, dataType = "Long", example = "100")
     })
     @GetMapping(value = "/select-sub-process")
     @ResponseStatus(HttpStatus.OK)
@@ -333,7 +339,7 @@ public class ProcessInstanceController extends BaseController {
     @ApiException(QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR)
     @AccessLogAnnotation
     public Result viewVariables(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                @RequestParam("processInstanceId") Integer processInstanceId) throws Exception {
+                                @RequestParam("processInstanceId") Integer processInstanceId) {
         Map<String, Object> result = processInstanceService.viewVariables(processInstanceId);
         return returnDataList(result);
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index d829a71..f559040 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 
 import java.io.IOException;
-import java.text.ParseException;
 import java.util.List;
 import java.util.Map;
 
@@ -63,7 +62,7 @@ public interface ProcessInstanceService {
      * @param projectCode project code
      * @param pageNo page number
      * @param pageSize page size
-     * @param processDefineId process definition id
+     * @param processDefineCode process definition code
      * @param searchVal search value
      * @param stateType state type
      * @param host host
@@ -73,7 +72,7 @@ public interface ProcessInstanceService {
      */
     Map<String, Object> queryProcessInstanceList(User loginUser,
                                                  long projectCode,
-                                                 Integer processDefineId,
+                                                 long processDefineCode,
                                                  String startDate,
                                                  String endDate,
                                                  String searchVal,
@@ -115,23 +114,28 @@ public interface ProcessInstanceService {
      *
      * @param loginUser login user
      * @param projectCode project code
-     * @param processInstanceJson process instance json
+     * @param taskRelationJson process task relation json
      * @param processInstanceId process instance id
      * @param scheduleTime schedule time
      * @param syncDefine sync define
      * @param flag flag
-     * @param locations locations
+     * @param globalParams global params
+     * @param locations locations for nodes
+     * @param timeout timeout
+     * @param tenantCode tenantCode
      * @return update result code
-     * @throws ParseException parse exception for json parse
      */
     Map<String, Object> updateProcessInstance(User loginUser,
                                               long projectCode,
                                               Integer processInstanceId,
-                                              String processInstanceJson,
+                                              String taskRelationJson,
                                               String scheduleTime,
                                               Boolean syncDefine,
                                               Flag flag,
-                                              String locations) throws ParseException;
+                                              String globalParams,
+                                              String locations,
+                                              int timeout,
+                                              String tenantCode);
 
     /**
      * query parent process instance detail info by sub process instance id
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 9b89972..b5679ed 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -50,9 +50,9 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.io.BufferedReader;
@@ -72,17 +73,13 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -96,8 +93,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 @Service
 public class ProcessInstanceServiceImpl extends BaseServiceImpl implements ProcessInstanceService {
 
-    private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class);
-
     public static final String TASK_TYPE = "taskType";
     public static final String LOCAL_PARAMS_LIST = "localParamsList";
 
@@ -137,6 +132,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
     @Autowired
     UsersService usersService;
 
+    @Autowired
+    private TenantMapper tenantMapper;
+
     /**
      * return top n SUCCESS process instance order by running time which started between startTime and endTime
      */
@@ -204,9 +202,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         } else {
             processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
             processInstance.setLocations(processDefinition.getLocations());
-
-            ProcessData processData = processService.genProcessData(processDefinition);
-            processInstance.setProcessInstanceJson(JSONUtils.toJsonString(processData));
+            processInstance.setDagData(processService.genDagData(processDefinition));
             result.put(DATA_LIST, processInstance);
             putMsg(result, Status.SUCCESS);
         }
@@ -219,9 +215,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
      *
      * @param loginUser login user
      * @param projectCode project code
+     * @param processDefineCode process definition code
      * @param pageNo page number
      * @param pageSize page size
-     * @param processDefineId process definition id
      * @param searchVal search value
      * @param stateType state type
      * @param host host
@@ -232,7 +228,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
     @Override
     public Map<String, Object> queryProcessInstanceList(User loginUser,
                                                         long projectCode,
-                                                        Integer processDefineId,
+                                                        long processDefineCode,
                                                         String startDate,
                                                         String endDate,
                                                         String searchVal,
@@ -265,11 +261,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         PageInfo<ProcessInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
         int executorId = usersService.getUserIdByName(executorName);
 
-        ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefineId);
-
         IPage<ProcessInstance> processInstanceList = processInstanceMapper.queryProcessInstanceListPaging(page,
-            project.getCode(), processDefinition == null ? 0L : processDefinition.getCode(), searchVal,
-            executorId, statusArray, host, start, end);
+            project.getCode(), processDefineCode, searchVal, executorId, statusArray, host, start, end);
 
         List<ProcessInstance> processInstances = processInstanceList.getRecords();
         List<Integer> userIds = CollectionUtils.transformToList(processInstances, ProcessInstance::getExecutorId);
@@ -410,19 +403,22 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
      *
      * @param loginUser login user
      * @param projectCode project code
-     * @param processInstanceJson process instance json
+     * @param taskRelationJson process task relation json
      * @param processInstanceId process instance id
      * @param scheduleTime schedule time
      * @param syncDefine sync define
      * @param flag flag
-     * @param locations locations
+     * @param globalParams global params
+     * @param locations locations for nodes
+     * @param timeout timeout
+     * @param tenantCode tenantCode
      * @return update result code
      */
     @Transactional
     @Override
-    public Map<String, Object> updateProcessInstance(User loginUser, long projectCode, Integer processInstanceId,
-                                                     String processInstanceJson, String scheduleTime, Boolean syncDefine,
-                                                     Flag flag, String locations) {
+    public Map<String, Object> updateProcessInstance(User loginUser, long projectCode, Integer processInstanceId, String taskRelationJson,
+                                                     String scheduleTime, Boolean syncDefine, Flag flag, String globalParams,
+                                                     String locations, int timeout, String tenantCode) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, project.getName());
@@ -441,28 +437,41 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
                 processInstance.getName(), processInstance.getState().toString(), "update");
             return result;
         }
-        ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion());
-        ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
-        //check workflow json is valid   TODO  processInstanceJson --> processTaskRelationJson
-        result = processDefinitionService.checkProcessNodeList(processInstanceJson);
+        ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+        List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+        //check workflow json is valid
+        result = processDefinitionService.checkProcessNodeList(taskRelationJson);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
-            processDefinition.getUserId());
-        setProcessInstance(processInstance, tenant, scheduleTime, processData);
-        int updateDefine = 1;
+        Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+        if (tenant == null) {
+            putMsg(result, Status.TENANT_NOT_EXIST);
+            return result;
+        }
+        setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout);
         if (Boolean.TRUE.equals(syncDefine)) {
-            processDefinition.setId(processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()).getId());
-            updateDefine = syncDefinition(loginUser, project, locations, processInstance, processDefinition, processData);
-
-            processInstance.setProcessDefinitionVersion(processDefinitionLogMapper.
-                queryMaxVersionForDefinition(processInstance.getProcessDefinitionCode()));
+            processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenant.getId());
+            processDefinition.setUpdateTime(new Date());
+            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false);
+            if (insertVersion > 0) {
+                int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
+                    processDefinition.getCode(), insertVersion, taskRelationList);
+                if (insertResult > 0) {
+                    putMsg(result, Status.SUCCESS);
+                    result.put(Constants.DATA_LIST, processDefinition);
+                } else {
+                    putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+                    return result;
+                }
+            } else {
+                putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+                return result;
+            }
+            processInstance.setProcessDefinitionVersion(insertVersion);
         }
-
         int update = processService.updateProcessInstance(processInstance);
-        if (update > 0 && updateDefine > 0) {
+        if (update > 0) {
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
@@ -471,42 +480,19 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
     }
 
     /**
-     * sync definition according process instance
-     */
-    private int syncDefinition(User loginUser, Project project, String locations, ProcessInstance processInstance,
-                               ProcessDefinition processDefinition, ProcessData processData) {
-
-        String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
-        processDefinition.setGlobalParams(originDefParams);
-        processDefinition.setLocations(locations);
-        processDefinition.setTimeout(processInstance.getTimeout());
-        processDefinition.setUpdateTime(new Date());
-
-        return processService.saveProcessDefinition(loginUser, project, processDefinition.getName(),
-            processDefinition.getDescription(), locations, processData, processDefinition, false);
-    }
-
-    /**
      * update process instance attributes
      */
-    private void setProcessInstance(ProcessInstance processInstance, Tenant tenant, String scheduleTime, ProcessData processData) {
-
+    private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout) {
         Date schedule = processInstance.getScheduleTime();
         if (scheduleTime != null) {
             schedule = DateUtils.getScheduleDate(scheduleTime);
         }
         processInstance.setScheduleTime(schedule);
-        List<Property> globalParamList = processData.getGlobalParams();
-        Map<String, String> globalParamMap = Optional.ofNullable(globalParamList)
-            .orElse(Collections.emptyList())
-            .stream()
-            .collect(Collectors.toMap(Property::getProp, Property::getValue));
-        String globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
-            processInstance.getCmdTypeIfComplement(), schedule);
-        processInstance.setTimeout(processData.getTimeout());
-        if (tenant != null) {
-            processInstance.setTenantCode(tenant.getTenantCode());
-        }
+        List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);
+        Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
+        globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule);
+        processInstance.setTimeout(timeout);
+        processInstance.setTenantCode(tenantCode);
         processInstance.setGlobalParams(globalParams);
     }
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
index 099f62c..9bd4fbd 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
@@ -73,8 +73,8 @@ public class ProcessDefinitionControllerTest {
     @Test
     public void testCreateProcessDefinition() throws Exception {
         String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
-                + "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
-                + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]";
+                + "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+                + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
 
         long projectCode = 1L;
         String name = "dag_test";
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
index 959418c..525be35 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
@@ -42,7 +42,7 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
     @Test
     public void testQueryProcessInstanceList() throws Exception {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("processDefinitionId", "91");
+        paramsMap.add("processDefineCode", "91");
         paramsMap.add("searchVal", "cxc");
         paramsMap.add("stateType", String.valueOf(ExecutionStatus.SUCCESS));
         paramsMap.add("host", "192.168.1.13");
@@ -78,11 +78,13 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
 
     @Test
     public void testUpdateProcessInstance() throws Exception {
-        String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
+        String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+            + "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+            + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
         String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
 
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
-        paramsMap.add("processInstanceJson", json);
+        paramsMap.add("taskRelationJson", json);
         paramsMap.add("processInstanceId", "91");
         paramsMap.add("scheduleTime", "2019-12-15 00:00:00");
         paramsMap.add("syncDefine", "false");
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index 5bc5e84..45f50cd 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -49,6 +49,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.io.IOException;
@@ -108,11 +109,12 @@ public class ProcessInstanceServiceTest {
     @Mock
     UsersService usersService;
 
-    private String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\","
-        + "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"},"
-        + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
-        + "\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
-        + "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+    @Mock
+    TenantMapper tenantMapper;
+
+    private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+        + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+        + "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
 
     @Test
     public void testQueryProcessInstanceList() {
@@ -368,7 +370,7 @@ public class ProcessInstanceServiceTest {
         when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
         Map<String, Object> proejctAuthFailRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "");
+            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "", "", 0, "");
         Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
 
         //process instance null
@@ -378,7 +380,7 @@ public class ProcessInstanceServiceTest {
         when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
         when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
         Map<String, Object> processInstanceNullRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "");
+            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "", "", 0, "");
         Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
 
         //process instance not finish
@@ -386,7 +388,7 @@ public class ProcessInstanceServiceTest {
         processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         putMsg(result, Status.SUCCESS, projectCode);
         Map<String, Object> processInstanceNotFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "");
+            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "", "", 0, "");
         Assert.assertEquals(Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstanceNotFinishRes.get(Constants.STATUS));
 
         //process instance finish
@@ -398,29 +400,23 @@ public class ProcessInstanceServiceTest {
         ProcessDefinition processDefinition = getProcessDefinition();
         processDefinition.setId(1);
         processDefinition.setUserId(1);
-        Tenant tenant = new Tenant();
-        tenant.setId(1);
-        tenant.setTenantCode("test_tenant");
+        Tenant tenant = getTenant();
         when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+        when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
         when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
         when(processService.updateProcessInstance(processInstance)).thenReturn(1);
         when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
-        when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition);
         putMsg(result, Status.SUCCESS, projectCode);
         Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "");
-        Assert.assertEquals(Status.UPDATE_PROCESS_INSTANCE_ERROR, processInstanceFinishRes.get(Constants.STATUS));
+            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "", "", 0, "root");
+        Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS));
 
         //success
-        when(processService.saveProcessDefinition(Mockito.any(), Mockito.any(),
-            Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
-            Mockito.any(), Mockito.any(), Mockito.anyBoolean())).thenReturn(1);
-        when(processService.findProcessDefinition(46L, 0)).thenReturn(processDefinition);
+        when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
         putMsg(result, Status.SUCCESS, projectCode);
 
         Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, Flag.YES, "");
+            shellJson, "2020-02-21 00:00:00", false, Flag.YES, "", "", 0, "root");
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 
@@ -488,7 +484,6 @@ public class ProcessInstanceServiceTest {
         ProcessInstance processInstance = getProcessInstance();
         processInstance.setCommandType(CommandType.SCHEDULER);
         processInstance.setScheduleTime(new Date());
-        processInstance.setProcessInstanceJson(shellJson);
         processInstance.setGlobalParams("");
         when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance);
         Map<String, Object> successRes = processInstanceService.viewVariables(1);
@@ -498,7 +493,6 @@ public class ProcessInstanceServiceTest {
     @Test
     public void testViewGantt() throws Exception {
         ProcessInstance processInstance = getProcessInstance();
-        processInstance.setProcessInstanceJson(shellJson);
         TaskInstance taskInstance = getTaskInstance();
         taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setStartTime(new Date());
@@ -583,6 +577,13 @@ public class ProcessInstanceServiceTest {
         return processDefinition;
     }
 
+    private Tenant getTenant() {
+        Tenant tenant = new Tenant();
+        tenant.setId(1);
+        tenant.setTenantCode("root");
+        return tenant;
+    }
+
     /**
      * get Mock worker group
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 6f170df..a655440 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -81,13 +81,6 @@ public class ProcessDefinition {
     private long projectCode;
 
     /**
-     * definition json string
-     * TODO: delete
-     */
-    @TableField(exist = false)
-    private String processDefinitionJson;
-
-    /**
      * description
      */
     private String description;
@@ -250,14 +243,6 @@ public class ProcessDefinition {
         this.releaseState = releaseState;
     }
 
-    public String getProcessDefinitionJson() {
-        return processDefinitionJson;
-    }
-
-    public void setProcessDefinitionJson(String processDefinitionJson) {
-        this.processDefinitionJson = processDefinitionJson;
-    }
-
     public Date getCreateTime() {
         return createTime;
     }
@@ -439,7 +424,6 @@ public class ProcessDefinition {
             + ", releaseState=" + releaseState
             + ", projectId=" + projectId
             + ", projectCode=" + projectCode
-            + ", processDefinitionJson='" + processDefinitionJson + '\''
             + ", description='" + description + '\''
             + ", globalParams='" + globalParams + '\''
             + ", globalParamList=" + globalParamList
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 48a3d79..dd98fca 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -151,11 +151,10 @@ public class ProcessInstance {
     private String globalParams;
 
     /**
-     * process instance json
-     * TODO delete
+     * dagData
      */
     @TableField(exist = false)
-    private String processInstanceJson;
+    private DagData dagData;
 
     /**
      * executor id
@@ -419,12 +418,12 @@ public class ProcessInstance {
         this.globalParams = globalParams;
     }
 
-    public String getProcessInstanceJson() {
-        return processInstanceJson;
+    public DagData getDagData() {
+        return dagData;
     }
 
-    public void setProcessInstanceJson(String processInstanceJson) {
-        this.processInstanceJson = processInstanceJson;
+    public void setDagData(DagData dagData) {
+        this.dagData = dagData;
     }
 
     public String getTenantCode() {
@@ -620,9 +619,6 @@ public class ProcessInstance {
             + ", globalParams='"
             + globalParams
             + '\''
-            + ", processInstanceJson='"
-            + processInstanceJson
-            + '\''
             + ", executorId="
             + executorId
             + ", tenantCode='"
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index e64af10..c71eb3b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -33,7 +33,6 @@ import static java.util.stream.Collectors.toSet;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.ConditionType;
 import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -58,8 +57,6 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
@@ -104,7 +101,6 @@ import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 
 import java.util.ArrayList;
@@ -946,12 +942,11 @@ public class ProcessService {
      * set sub work process flag, extends parent work process command parameters
      *
      * @param subProcessInstance subProcessInstance
-     * @return process instance
      */
-    public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance) {
+    public void setSubProcessParam(ProcessInstance subProcessInstance) {
         String cmdParam = subProcessInstance.getCommandParam();
         if (StringUtils.isEmpty(cmdParam)) {
-            return subProcessInstance;
+            return;
         }
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
         // write sub process id into cmd param.
@@ -977,13 +972,12 @@ public class ProcessService {
         }
         ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
         if (processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0) {
-            return subProcessInstance;
+            return;
         }
         // update sub process id to process map table
         processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
 
         this.updateWorkProcessInstanceMap(processInstanceMap);
-        return subProcessInstance;
     }
 
     /**
@@ -2075,43 +2069,6 @@ public class ProcessService {
     }
 
     /**
-     * update task definition
-     */
-    public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
-        Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode());
-        Date now = new Date();
-        taskDefinition.setProjectCode(projectCode);
-        taskDefinition.setUserId(operator.getId());
-        taskDefinition.setVersion(version == null || version == 0 ? 1 : version + 1);
-        taskDefinition.setUpdateTime(now);
-        setTaskFromTaskNode(taskNode, taskDefinition);
-        int update = taskDefinitionMapper.updateById(taskDefinition);
-        // save task definition log
-        TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
-        taskDefinitionLog.setOperator(operator.getId());
-        taskDefinitionLog.setOperateTime(now);
-        int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
-        return insert & update;
-    }
-
-    private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) {
-        taskDefinition.setName(taskNode.getName());
-        taskDefinition.setDescription(taskNode.getDesc());
-        taskDefinition.setTaskType(taskNode.getType().toUpperCase());
-        taskDefinition.setTaskParams(taskNode.getTaskParams());
-        taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
-        taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
-        taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
-        taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
-        taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
-        taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
-        taskDefinition.setTimeoutNotifyStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
-        taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
-        taskDefinition.setDelayTime(taskNode.getDelayTime());
-        taskDefinition.setResourceIds(getResourceIds(taskDefinition));
-    }
-
-    /**
      * get resource ids
      *
      * @param taskDefinition taskDefinition
@@ -2179,174 +2136,6 @@ public class ProcessService {
         return result & resultLog;
     }
 
-    /**
-     * save processDefinition (including create or update processDefinition)
-     */
-    @Deprecated
-    public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
-                                     ProcessData processData, ProcessDefinition processDefinition, Boolean isFromProcessDefine) {
-        ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
-            name, processData, project, desc, locations);
-        Map<String, TaskDefinition> taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks(), isFromProcessDefine);
-        if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) {
-            return Constants.DEFINITION_FAILURE;
-        }
-        return processDefinitionToDB(processDefinition, processDefinitionLog, isFromProcessDefine);
-    }
-
-    /**
-     * save processDefinition
-     */
-    @Deprecated
-    public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName,
-                                                           ProcessData processData, Project project, String desc, String locations) {
-        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
-        Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionCode);
-        processDefinitionLog.setUserId(operator.getId());
-        processDefinitionLog.setCode(processDefinitionCode);
-        processDefinitionLog.setVersion(version == null || version == 0 ? 1 : version + 1);
-        processDefinitionLog.setName(processDefinitionName);
-        processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
-        processDefinitionLog.setProjectCode(project.getCode());
-        processDefinitionLog.setDescription(desc);
-        processDefinitionLog.setLocations(locations);
-        processDefinitionLog.setTimeout(processData.getTimeout());
-        processDefinitionLog.setTenantId(processData.getTenantId());
-        processDefinitionLog.setOperator(operator.getId());
-        Date now = new Date();
-        processDefinitionLog.setOperateTime(now);
-        processDefinitionLog.setUpdateTime(now);
-        processDefinitionLog.setCreateTime(now);
-
-        //custom global params
-        List<Property> globalParamsList = new ArrayList<>();
-        if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
-            Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
-            globalParamsList = new ArrayList<>(userDefParamsSet);
-        }
-        processDefinitionLog.setGlobalParamList(globalParamsList);
-        processDefinitionLog.setFlag(Flag.YES);
-        int insert = processDefineLogMapper.insert(processDefinitionLog);
-        if (insert > 0) {
-            return processDefinitionLog;
-        }
-        return null;
-    }
-
-    /**
-     * handle task definition
-     */
-    @Deprecated
-    public Map<String, TaskDefinition> handleTaskDefinition(User operator, Long projectCode, List<TaskNode> taskNodes, Boolean isFromProcessDefine) {
-        if (taskNodes == null) {
-            return null;
-        }
-        Map<String, TaskDefinition> taskDefinitionMap = new HashMap<>();
-        for (TaskNode taskNode : taskNodes) {
-            TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskNode.getCode());
-            if (taskDefinition == null) {
-                try {
-                    long code = SnowFlakeUtils.getInstance().nextId();
-                    taskDefinition = new TaskDefinition();
-                    taskDefinition.setCode(code);
-                } catch (SnowFlakeException e) {
-                    throw new ServiceException("Task code get error", e);
-                }
-                saveTaskDefinition(operator, projectCode, taskNode, taskDefinition);
-            } else {
-                if (isFromProcessDefine && isTaskOnline(taskDefinition.getCode())) {
-                    throw new ServiceException(String.format("The task %s is on line in process", taskNode.getName()));
-                }
-                updateTaskDefinition(operator, projectCode, taskNode, taskDefinition);
-            }
-            taskDefinitionMap.put(taskNode.getName(), taskDefinition);
-        }
-        return taskDefinitionMap;
-    }
-
-    /**
-     * handle task relations
-     */
-    public int handleTaskRelation(User operator,
-                                  Long projectCode,
-                                  ProcessDefinition processDefinition,
-                                  List<TaskNode> taskNodes,
-                                  Map<String, TaskDefinition> taskDefinitionMap) {
-        if (null == processDefinition || null == taskNodes || null == taskDefinitionMap) {
-            return Constants.DEFINITION_FAILURE;
-        }
-        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
-        if (!processTaskRelationList.isEmpty()) {
-            processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode());
-        }
-        List<ProcessTaskRelation> builderRelationList = new ArrayList<>();
-        Date now = new Date();
-        for (TaskNode taskNode : taskNodes) {
-            List<String> depList = taskNode.getDepList();
-            if (CollectionUtils.isNotEmpty(depList)) {
-                for (String preTaskName : depList) {
-                    builderRelationList.add(new ProcessTaskRelation(
-                        StringUtils.EMPTY,
-                        processDefinition.getVersion(),
-                        projectCode,
-                        processDefinition.getCode(),
-                        taskDefinitionMap.get(preTaskName).getCode(),
-                        taskDefinitionMap.get(preTaskName).getVersion(),
-                        taskDefinitionMap.get(taskNode.getName()).getCode(),
-                        taskDefinitionMap.get(taskNode.getName()).getVersion(),
-                        ConditionType.NONE,
-                        StringUtils.EMPTY,
-                        now,
-                        now));
-                }
-            } else {
-                builderRelationList.add(new ProcessTaskRelation(
-                    StringUtils.EMPTY,
-                    processDefinition.getVersion(),
-                    projectCode,
-                    processDefinition.getCode(),
-                    0L, // this isn't previous task node, set zero
-                    0,
-                    taskDefinitionMap.get(taskNode.getName()).getCode(),
-                    taskDefinitionMap.get(taskNode.getName()).getVersion(),
-                    ConditionType.NONE,
-                    StringUtils.EMPTY,
-                    now,
-                    now));
-            }
-        }
-        for (ProcessTaskRelation processTaskRelation : builderRelationList) {
-            saveTaskRelation(operator, processTaskRelation);
-        }
-        return 0;
-    }
-
-    public void saveTaskRelation(User operator, ProcessTaskRelation processTaskRelation) {
-        processTaskRelationMapper.insert(processTaskRelation);
-        // save process task relation log
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
-        processTaskRelationLog.setOperator(operator.getId());
-        processTaskRelationLog.setOperateTime(new Date());
-        processTaskRelationLogMapper.insert(processTaskRelationLog);
-    }
-
-    public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
-        Date now = new Date();
-        taskDefinition.setProjectCode(projectCode);
-        taskDefinition.setUserId(operator.getId());
-        taskDefinition.setVersion(1);
-        taskDefinition.setUpdateTime(now);
-        taskDefinition.setCreateTime(now);
-        setTaskFromTaskNode(taskNode, taskDefinition);
-        // save the new task definition
-        int insert = taskDefinitionMapper.insert(taskDefinition);
-        TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
-        taskDefinitionLog.setOperator(operator.getId());
-        taskDefinitionLog.setOperateTime(now);
-        int logInsert = taskDefinitionLogMapper.insert(taskDefinitionLog);
-        return insert & logInsert;
-    }
-
     public boolean isTaskOnline(Long taskCode) {
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
         if (!processTaskRelationList.isEmpty()) {
@@ -2400,22 +2189,6 @@ public class ProcessService {
         return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
     }
 
-    /**
-     * generate ProcessData
-     * it will be replaced by genDagData method
-     */
-    @Deprecated
-    public ProcessData genProcessData(ProcessDefinition processDefinition) {
-        Map<String, String> locationMap = locationToMap(processDefinition.getLocations());
-        List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion(), locationMap);
-        ProcessData processData = new ProcessData();
-        processData.setTasks(taskNodes);
-        processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(), Property.class));
-        processData.setTenantId(processDefinition.getTenantId());
-        processData.setTimeout(processDefinition.getTimeout());
-        return processData;
-    }
-
     @Deprecated
     public List<TaskNode> genTaskNodeList(Long processCode, int processVersion, Map<String, String> locationMap) {
         List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
@@ -2501,26 +2274,6 @@ public class ProcessService {
     }
 
     /**
-     * parse locations
-     *
-     * @param locations processDefinition locations
-     * @return key:taskName,value:taskId
-     */
-    public Map<String, String> locationToMap(String locations) {
-        Map<String, String> frontTaskIdAndNameMap = new HashMap<>();
-        if (StringUtils.isBlank(locations)) {
-            return frontTaskIdAndNameMap;
-        }
-        ObjectNode jsonNodes = JSONUtils.parseObject(locations);
-        Iterator<Entry<String, JsonNode>> fields = jsonNodes.fields();
-        while (fields.hasNext()) {
-            Entry<String, JsonNode> jsonNodeEntry = fields.next();
-            frontTaskIdAndNameMap.put(JSONUtils.findValue(jsonNodeEntry.getValue(), "name"), jsonNodeEntry.getKey());
-        }
-        return frontTaskIdAndNameMap;
-    }
-
-    /**
      * add authorized resources
      *
      * @param ownResources own resources
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 413a9d8..1cdc98a 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -246,13 +246,6 @@ public class ProcessServiceTest {
         processDefinition.setName("test");
         processDefinition.setVersion(1);
         processDefinition.setCode(11L);
-        processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":"
-                + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
-                + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
-                + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
-                + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
-                + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
-                + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
         processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
         ProcessInstance processInstance = new ProcessInstance();
         processInstance.setId(222);
@@ -356,27 +349,6 @@ public class ProcessServiceTest {
     }
 
     @Test
-    public void testSaveProcessDefinition() {
-        User user = new User();
-        user.setId(1);
-
-        Project project = new Project();
-        project.setCode(1L);
-
-        ProcessData processData = new ProcessData();
-        processData.setTasks(new ArrayList<>());
-
-        ProcessDefinition processDefinition = new ProcessDefinition();
-        processDefinition.setCode(1L);
-        processDefinition.setId(123);
-        processDefinition.setName("test");
-        processDefinition.setVersion(1);
-        processDefinition.setCode(11L);
-        Assert.assertEquals(-1, processService.saveProcessDefinition(user, project, "name",
-                "desc", "locations", processData, processDefinition, true));
-    }
-
-    @Test
     public void testSwitchVersion() {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setCode(1L);
@@ -445,80 +417,6 @@ public class ProcessServiceTest {
     }
 
     @Test
-    public void testGenProcessData() {
-        String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\",\"desc\":null,"
-                + "\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":0,"
-                + "\"params\":{},\"preTasks\":[\"unit-test\"],\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\","
-                + "\"version\":0}],\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
-                + "\"taskInstancePriority\":null,\"workerGroup\":null,\"timeout\":{\"enable\":false,\"strategy\":null,"
-                + "\"interval\":0},\"delayTime\":0}],\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
-
-        ProcessDefinition processDefinition = new ProcessDefinition();
-        processDefinition.setCode(1L);
-        processDefinition.setId(123);
-        processDefinition.setName("test");
-        processDefinition.setVersion(1);
-        processDefinition.setCode(11L);
-
-        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
-        processTaskRelationLog.setName("def 1");
-        processTaskRelationLog.setProcessDefinitionVersion(1);
-        processTaskRelationLog.setProjectCode(1L);
-        processTaskRelationLog.setProcessDefinitionCode(1L);
-        processTaskRelationLog.setPostTaskCode(3L);
-        processTaskRelationLog.setPreTaskCode(2L);
-        processTaskRelationLog.setUpdateTime(new Date());
-        processTaskRelationLog.setCreateTime(new Date());
-        List<ProcessTaskRelationLog> list = new ArrayList<>();
-        list.add(processTaskRelationLog);
-
-        TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
-        taskDefinition.setCode(3L);
-        taskDefinition.setName("1-test");
-        taskDefinition.setProjectCode(1L);
-        taskDefinition.setTaskType(TaskType.SHELL.getDesc());
-        taskDefinition.setUserId(1);
-        taskDefinition.setVersion(2);
-        taskDefinition.setCreateTime(new Date());
-        taskDefinition.setUpdateTime(new Date());
-
-        TaskDefinitionLog td2 = new TaskDefinitionLog();
-        td2.setCode(2L);
-        td2.setName("unit-test");
-        td2.setProjectCode(1L);
-        td2.setTaskType(TaskType.SHELL.getDesc());
-        td2.setUserId(1);
-        td2.setVersion(1);
-        td2.setCreateTime(new Date());
-        td2.setUpdateTime(new Date());
-
-        List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
-        taskDefinitionLogs.add(taskDefinition);
-        taskDefinitionLogs.add(td2);
-
-        Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
-        Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())).thenReturn(list);
-        String json = JSONUtils.toJsonString(processService.genProcessData(processDefinition));
-
-        Assert.assertEquals(processDefinitionJson, json);
-    }
-
-    @Test
-    public void locationToMap() {
-        String locations = "{\"tasks-64888\":{\"name\":\"test_a\",\"targetarr\":\"\",\"nodenumber\":\"1\",\"x\":134,\"y\":183},"
-                + "\"tasks-24501\":{\"name\":\"test_b\",\"targetarr\":\"tasks-64888\",\"nodenumber\":\"0\",\"x\":392,\"y\":184},"
-                + "\"tasks-81137\":{\"name\":\"test_c\",\"targetarr\":\"\",\"nodenumber\":\"1\",\"x\":122,\"y\":327},"
-                + "\"tasks-41367\":{\"name\":\"test_d\",\"targetarr\":\"tasks-81137\",\"nodenumber\":\"0\",\"x\":409,\"y\":324}}";
-        Map<String, String> frontTaskIdAndNameMap = new HashMap<>();
-        frontTaskIdAndNameMap.put("test_a", "tasks-64888");
-        frontTaskIdAndNameMap.put("test_b", "tasks-24501");
-        frontTaskIdAndNameMap.put("test_c", "tasks-81137");
-        frontTaskIdAndNameMap.put("test_d", "tasks-41367");
-        Map<String, String> locationToMap = processService.locationToMap(locations);
-        Assert.assertEquals(frontTaskIdAndNameMap, locationToMap);
-    }
-
-    @Test
     public void testCreateCommand() {
         Command command = new Command();
         command.setProcessDefinitionCode(123);
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index f396781..f6575f4 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -296,7 +296,6 @@ CREATE TABLE t_ds_process_definition (
   user_id int DEFAULT NULL ,
   global_params text ,
   locations text ,
-  connects text ,
   warning_group_id int DEFAULT NULL ,
   flag int DEFAULT NULL ,
   timeout int DEFAULT '0' ,
@@ -321,7 +320,6 @@ CREATE TABLE t_ds_process_definition_log (
   user_id int DEFAULT NULL ,
   global_params text ,
   locations text ,
-  connects text ,
   warning_group_id int DEFAULT NULL ,
   flag int DEFAULT NULL ,
   timeout int DEFAULT '0' ,