You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/02/07 08:15:43 UTC

[incubator-dolphinscheduler] branch json_split updated: [Feature][JsonSplit] refactor process definition update (#4708)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new f31eee4  [Feature][JsonSplit] refactor process definition update  (#4708)
f31eee4 is described below

commit f31eee43411575e12f4f8b0ef27fc64f7932f296
Author: bao liang <29...@users.noreply.github.com>
AuthorDate: Sun Feb 7 16:15:35 2021 +0800

    [Feature][JsonSplit] refactor process definition update  (#4708)
    
    * add code in task_instance and process instance
    
    * delete process_definition_id in t_ds_task_instance
    
    * add task_code task_definition_version process_definition_code in task instance
    
    * add task_code task_definition_version process_definition_code in task instance
    
    * refactor process instance
    
    * refactor process instance update
    
    * refactor json-split for process definition and task definition
    refactor process instance update
    
    * refactor json-split for process definition and task definition
    refactor process instance update
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * refactor code
---
 .../api/service/ProcessInstanceService.java        | 146 +++++++-----
 .../service/impl/ProcessDefinitionServiceImpl.java | 154 ++-----------
 .../service/impl/TaskDefinitionServiceImpl.java    |  63 +-----
 .../dao/entity/ProcessDefinition.java              |   2 +
 .../dao/entity/ProcessInstance.java                |  37 +++
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  43 ++++
 .../dao/mapper/ProcessDefinitionLogMapper.java     |  11 +-
 .../dao/mapper/ProcessDefinitionLogMapper.xml      |  10 +-
 .../dao/mapper/ProcessInstanceMapper.xml           |   4 +-
 .../dao/mapper/TaskInstanceMapper.xml              |  10 +-
 .../service/process/ProcessService.java            | 248 +++++++++++++++++++++
 sql/dolphinscheduler-postgre.sql                   |   2 +-
 sql/dolphinscheduler_mysql.sql                     |   4 +-
 13 files changed, 466 insertions(+), 268 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index e74b2b4..7976882 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -190,6 +191,7 @@ public class ProcessInstanceService extends BaseService {
 
         ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
         processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
+        processInstance.setProcessDefinitionId(processDefinition.getId());
         result.put(DATA_LIST, processInstance);
         putMsg(result, Status.SUCCESS);
 
@@ -406,91 +408,123 @@ public class ProcessInstanceService extends BaseService {
                                                      Flag flag, String locations, String connects) throws ParseException {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByName(projectName);
-
         //check project permission
         Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
         Status resultEnum = (Status) checkResult.get(Constants.STATUS);
         if (resultEnum != Status.SUCCESS) {
             return checkResult;
         }
-
         //check process instance exists
         ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
         if (processInstance == null) {
             putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
             return result;
         }
-
         //check process instance status
         if (!processInstance.getState().typeIsFinished()) {
             putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
                     processInstance.getName(), processInstance.getState().toString(), "update");
             return result;
         }
-        Date schedule = null;
-        schedule = processInstance.getScheduleTime();
-        if (scheduleTime != null) {
-            schedule = DateUtils.getScheduleDate(scheduleTime);
+        ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
+        ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
+        //check workflow json is valid
+        result = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
         }
-        processInstance.setScheduleTime(schedule);
-        processInstance.setLocations(locations);
-        processInstance.setConnects(connects);
-        String globalParams = null;
-        String originDefParams = null;
-        int timeout = processInstance.getTimeout();
-        ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
-        if (StringUtils.isNotEmpty(processInstanceJson)) {
-            ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
-            //check workflow json is valid
-            Map<String, Object> checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
-            if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) {
-                return result;
-            }
-
-            originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
-            List<Property> globalParamList = processData.getGlobalParams();
-            Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
-            globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
-                    processInstance.getCmdTypeIfComplement(), schedule);
-            timeout = processData.getTimeout();
-            processInstance.setTimeout(timeout);
-            Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
-                    processDefinition.getUserId());
-            if (tenant != null) {
-                processInstance.setTenantCode(tenant.getTenantCode());
-            }
-            // get the processinstancejson before saving,and then save the name and taskid
-            String oldJson = processInstance.getProcessInstanceJson();
-            if (StringUtils.isNotEmpty(oldJson)) {
-                processInstanceJson = processService.changeJson(processData,oldJson);
-            }
-            processInstance.setProcessInstanceJson(processInstanceJson);
-            processInstance.setGlobalParams(globalParams);
+        Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
+                processDefinition.getUserId());
+        // get the processinstancejson before saving,and then save the name and taskid
+        String oldJson = processInstance.getProcessInstanceJson();
+        if (StringUtils.isNotEmpty(oldJson)) {
+            processInstanceJson = processService.changeJson(processData, oldJson);
         }
-
+        setProcessInstance(processInstance, tenant, scheduleTime, locations,
+                connects, processInstanceJson, processData);
         int update = processService.updateProcessInstance(processInstance);
         int updateDefine = 1;
         if (Boolean.TRUE.equals(syncDefine)) {
-            processDefinition.setProcessDefinitionJson(processInstanceJson);
-            processDefinition.setGlobalParams(originDefParams);
-            processDefinition.setLocations(locations);
-            processDefinition.setConnects(connects);
-            processDefinition.setTimeout(timeout);
-            processDefinition.setUpdateTime(new Date());
-
-            // add process definition version
-            int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
-            processDefinition.setVersion(version);
-            updateDefine = processDefineMapper.updateById(processDefinition);
+            updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects,
+                    processInstance, processDefinition, processData);
         }
         if (update > 0 && updateDefine > 0) {
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
         }
-
         return result;
+    }
 
+    /**
+     * sync definition according process instance
+     *
+     * @param  loginUser
+     * @param project
+     * @param processInstanceJson
+     * @param locations
+     * @param connects
+     * @param processInstance
+     * @param processDefinition
+     * @param processData
+     * @return
+     */
+    private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects,
+                               ProcessInstance processInstance, ProcessDefinition processDefinition,
+                               ProcessData processData) {
+
+        String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
+        processDefinition.setProcessDefinitionJson(processInstanceJson);
+        processDefinition.setGlobalParams(originDefParams);
+        processDefinition.setLocations(locations);
+        processDefinition.setConnects(connects);
+        processDefinition.setTimeout(processInstance.getTimeout());
+        processDefinition.setUpdateTime(new Date());
+
+        int updateDefine = processService.saveProcessDefinition(loginUser, project, processDefinition.getName(),
+                processDefinition.getDescription(), locations, connects,
+                processData, processDefinition);
+        return updateDefine;
+    }
+
+    /**
+     * update process instance attributes
+     *
+     * @param processInstance
+     * @param tenant
+     * @param scheduleTime
+     * @param locations
+     * @param connects
+     * @param processInstanceJson
+     * @param processData
+     * @return false if check failed or
+     */
+    private void setProcessInstance(ProcessInstance processInstance, Tenant tenant,
+                                    String scheduleTime, String locations, String connects, String processInstanceJson,
+                                    ProcessData processData) {
+
+        Date schedule = processInstance.getScheduleTime();
+        if (scheduleTime != null) {
+            schedule = DateUtils.getScheduleDate(scheduleTime);
+        }
+        processInstance.setScheduleTime(schedule);
+        processInstance.setLocations(locations);
+        processInstance.setConnects(connects);
+        if (StringUtils.isNotEmpty(processInstanceJson)) {
+            return;
+        }
+        List<Property> globalParamList = processData.getGlobalParams();
+        Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
+        String globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
+                processInstance.getCmdTypeIfComplement(), schedule);
+        int timeout = processData.getTimeout();
+        processInstance.setTimeout(timeout);
+        if (tenant != null) {
+            processInstance.setTenantCode(tenant.getTenantCode());
+        }
+        processInstance.setProcessInstanceJson(processInstanceJson);
+        processInstance.setGlobalParams(globalParams);
     }
 
     /**
@@ -705,13 +739,9 @@ public class ProcessInstanceService extends BaseService {
     private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) {
 
         String processDefinitionJson = processInstance.getProcessInstanceJson();
-
         ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
         List<TaskNode> taskNodeList = processData.getTasks();
-
         ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
-
         return DagHelper.buildDagGraph(processDag);
     }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index f704971..03515e3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
-import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -48,7 +47,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
-import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -66,24 +64,24 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.service.permission.PermissionCheck;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -158,12 +156,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
     /**
      * create process definition
      *
      * @param loginUser login user
      * @param projectName project name
-     * @param name process definition name
+     * @param processDefinitionName process definition name
      * @param processDefinitionJson process definition json
      * @param desc description
      * @param locations locations for nodes
@@ -174,7 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     @Transactional(rollbackFor = Exception.class)
     public Map<String, Object> createProcessDefinition(User loginUser,
                                                        String projectName,
-                                                       String name,
+                                                       String processDefinitionName,
                                                        String processDefinitionJson,
                                                        String desc,
                                                        String locations,
@@ -190,8 +191,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
         }
 
         ProcessDefinition processDefinition = new ProcessDefinition();
-        Date now = new Date();
-
         ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
         Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
         if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {
@@ -206,68 +205,10 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
             putMsg(result, Status.CREATE_PROCESS_DEFINITION);
             return result;
         }
-
-        processDefinition.setName(name);
-        processDefinition.setVersion(1);
-        processDefinition.setReleaseState(ReleaseState.OFFLINE);
-        processDefinition.setUserId(loginUser.getId());
-        processDefinition.setDescription(desc);
-        processDefinition.setLocations(locations);
-        processDefinition.setConnects(connects);
-        processDefinition.setTimeout(processData.getTimeout());
-        processDefinition.setTenantId(processData.getTenantId());
-        processDefinition.setModifyBy(loginUser.getUserName());
-
-        //custom global params
-        List<Property> globalParamsList = processData.getGlobalParams();
-        if (CollectionUtils.isNotEmpty(globalParamsList)) {
-            Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
-            globalParamsList = new ArrayList<>(globalParamsSet);
-            processDefinition.setGlobalParamList(globalParamsList);
-        }
-        processDefinition.setCreateTime(now);
-        processDefinition.setUpdateTime(now);
-        processDefinition.setFlag(Flag.YES);
-
-        // save the new process definition
-        processDefinitionMapper.insert(processDefinition);
-
-        // parse and save the taskDefinition and processTaskRelation
-        try {
-            List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
-
-            for (TaskNode task : taskNodeList) {
-                taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
-            }
-
-            DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
-            Collection<String> beginNode = dag.getBeginNode();
-            Collection<String> endNode = dag.getEndNode();
-
-            // TODO:  query taskCode by  projectCode and taskName
-
-            processTaskRelationService.createProcessTaskRelation(
-                    loginUser,
-                    name,
-                    projectName,
-                    processDefinitionCode,
-                    0L,
-                    0L,
-                    "0",
-                    "");
-
-        } catch (Exception e) {
-            putMsg(result, Status.CREATE_PROCESS_DEFINITION);
-            return result;
-        }
-
-        // save process definition log
-        ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
-                JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
-
-        processDefinitionLog.setOperator(loginUser.getId());
-        processDefinitionLog.setOperateTime(now);
-        processDefinitionLogMapper.insert(processDefinitionLog);
+        ProcessDefinitionLog processDefinitionLog =  processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData,
+                project, desc, locations, connects);
+        processService.switchVersion(processDefinition, processDefinitionLog);
+        processService.createTaskAndRelation(loginUser, projectName, "", processDefinition, processData);
 
         // return processDefinition object with ID
         result.put(Constants.DATA_LIST, processDefinition.getId());
@@ -275,6 +216,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
         return result;
     }
 
+
     /**
      * get resource ids
      *
@@ -475,7 +417,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
             putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
             return result;
         }
-
         if (!name.equals(processDefinition.getName())) {
             // check whether the new process define name exist
             ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getId(), name);
@@ -487,55 +428,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
         // get the processdefinitionjson before saving,and then save the name and taskid
         String oldJson = processDefinition.getProcessDefinitionJson();
         processDefinitionJson = processService.changeJson(processData, oldJson);
-
-        // update TaskDefinition
         ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-        List<TaskNode> taskNodeList = (newProcessData.getTasks() == null) ? new ArrayList<>() : newProcessData.getTasks();
+        int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
+                locations, connects, newProcessData, processDefinition);
 
-        for (TaskNode task : taskNodeList) {
-            // TODO update by code directly
-            Map<String, Object> stringObjectMap = taskDefinitionService.queryTaskDefinitionByName(loginUser, projectName, task.getName());
-            TaskDefinition taskDefinition = (TaskDefinition) stringObjectMap.get(Constants.DATA_LIST);
-            taskDefinitionService.updateTaskDefinition(loginUser, projectName, taskDefinition.getCode(), JSONUtils.toJsonString(task));
-        }
-
-        List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionLogMapper.queryByDefinitionCode(processDefinition.getCode());
-        int version = getNextVersion(processDefinitionLogs);
-
-        Date now = new Date();
-        processDefinition.setVersion(version);
-        processDefinition.setName(name);
-        processDefinition.setReleaseState(ReleaseState.OFFLINE);
-        processDefinition.setProjectCode(project.getCode());
-        processDefinition.setDescription(desc);
-        processDefinition.setLocations(locations);
-        processDefinition.setConnects(connects);
-        processDefinition.setTimeout(processData.getTimeout());
-        processDefinition.setTenantId(processData.getTenantId());
-
-        //custom global params
-        List<Property> globalParamsList = new ArrayList<>();
-        if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
-            Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
-            globalParamsList = new ArrayList<>(userDefParamsSet);
-        }
-        processDefinition.setGlobalParamList(globalParamsList);
-        processDefinition.setUpdateTime(now);
-        processDefinition.setFlag(Flag.YES);
-
-
-        processDefinition.setVersion(version);
-        int update = processDefinitionMapper.updateById(processDefinition);
-
-        // save processDefinitionLog
-        ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
-                JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
-
-        processDefinitionLog.setOperator(loginUser.getId());
-        processDefinitionLog.setOperateTime(now);
-        int insert = processDefinitionLogMapper.insert(processDefinitionLog);
-
-        if (update > 0 && insert > 0) {
+        if (saveResult > 0) {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
         } else {
@@ -544,13 +441,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
         return result;
     }
 
-    private int getNextVersion(List<ProcessDefinitionLog>  processDefinitionLogs) {
-        return processDefinitionLogs
-                .stream()
-                .map(ProcessDefinitionLog::getVersion)
-                .max((x, y) -> x > y ? x : y)
-                .orElse(0) + 1;
-    }
 
     /**
      * verify process definition name unique
@@ -1732,18 +1622,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
                     , version);
             return result;
         }
-
-        processDefinition.setVersion(processDefinitionLog.getVersion());
-        processDefinition.setDescription(processDefinitionLog.getDescription());
-        processDefinition.setLocations(processDefinitionLog.getLocations());
-        processDefinition.setConnects(processDefinitionLog.getConnects());
-        processDefinition.setTimeout(processDefinitionLog.getTimeout());
-        processDefinition.setGlobalParams(processDefinitionLog.getGlobalParams());
-        processDefinition.setUpdateTime(new Date());
-        processDefinition.setWarningGroupId(processDefinitionLog.getWarningGroupId());
-        processDefinition.setResourceIds(processDefinitionLog.getResourceIds());
-
-        if (processDefinitionMapper.updateById(processDefinition) > 0) {
+        int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
+        if (switchVersion > 0) {
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index ed50788..e140052 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.Date;
 import java.util.HashMap;
@@ -88,6 +89,9 @@ public class TaskDefinitionServiceImpl extends BaseService implements
     @Autowired
     private ProcessDefinitionMapper processDefinitionMapper;
 
+    @Autowired
+    private ProcessService processService;
+
     /**
      * create task definition
      *
@@ -144,7 +148,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
                 taskNode.getTaskTimeoutParameter().getInterval(),
                 now,
                 now);
-        taskDefinition.setResourceIds(getResourceIds(taskDefinition));
+        taskDefinition.setResourceIds(processService.getResourceIds(taskDefinition));
         // save the new task definition
         taskDefinitionMapper.insert(taskDefinition);
         // save task definition log
@@ -160,30 +164,6 @@ public class TaskDefinitionServiceImpl extends BaseService implements
     }
 
     /**
-     * get resource ids
-     *
-     * @param taskDefinition taskDefinition
-     * @return resource ids
-     */
-    private String getResourceIds(TaskDefinition taskDefinition) {
-        Set<Integer> resourceIds = null;
-        // TODO modify taskDefinition.getTaskType()
-        AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams());
-
-        if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
-            resourceIds = params.getResourceFilesList().
-                    stream()
-                    .filter(t -> t.getId() != 0)
-                    .map(ResourceInfo::getId)
-                    .collect(Collectors.toSet());
-        }
-        if (CollectionUtils.isEmpty(resourceIds)) {
-            return StringUtils.EMPTY;
-        }
-        return StringUtils.join(resourceIds, ",");
-    }
-
-    /**
      * query task definition
      *
      * @param loginUser login user
@@ -276,38 +256,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
             return result;
         }
 
-        List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskCode);
-        int version = taskDefinitionLogs
-                .stream()
-                .map(TaskDefinitionLog::getVersion)
-                .max((x, y) -> x > y ? x : y)
-                .orElse(0) + 1;
-        Date now = new Date();
-        taskDefinition.setVersion(version);
-        taskDefinition.setCode(taskCode);
-        taskDefinition.setName(taskNode.getName());
-        taskDefinition.setDescription(taskNode.getDesc());
-        taskDefinition.setProjectCode(project.getCode());
-        taskDefinition.setUserId(loginUser.getId());
-        taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
-        taskDefinition.setTaskParams(taskNode.getParams());
-        taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
-        taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
-        taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
-        taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
-        taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
-        taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
-        taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
-        taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
-        taskDefinition.setUpdateTime(now);
-        taskDefinition.setResourceIds(getResourceIds(taskDefinition));
-        taskDefinitionMapper.updateById(taskDefinition);
-        // save task definition log
-        TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
-        taskDefinitionLog.set(taskDefinition);
-        taskDefinitionLog.setOperator(loginUser.getId());
-        taskDefinitionLog.setOperateTime(now);
-        taskDefinitionLogMapper.insert(taskDefinitionLog);
+        processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);
         result.put(Constants.DATA_LIST, taskCode);
         putMsg(result, Status.SUCCESS);
         return result;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 6779258..5271392 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -184,6 +184,8 @@ public class ProcessDefinition {
     @TableField(exist = false)
     private int warningGroupId;
 
+    public ProcessDefinition(){}
+
     public String getName() {
         return name;
     }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index f1d43a3..aa6b4b8 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -47,10 +47,23 @@ public class ProcessInstance {
      */
     @TableId(value = "id", type = IdType.AUTO)
     private int id;
+
     /**
      * process definition id
+     * TODO delete
      */
     private int processDefinitionId;
+
+    /**
+     * process definition code
+     */
+    private Long processDefinitionCode;
+
+    /**
+     * process definition version
+     */
+    private int processDefinitionVersion;
+
     /**
      * process state
      */
@@ -145,6 +158,7 @@ public class ProcessInstance {
 
     /**
      * process instance json
+     * TODO delete
      */
     private String processInstanceJson;
 
@@ -579,6 +593,22 @@ public class ProcessInstance {
         this.tenantId = tenantId;
     }
 
+    public Long getProcessDefinitionCode() {
+        return processDefinitionCode;
+    }
+
+    public void setProcessDefinitionCode(Long processDefinitionCode) {
+        this.processDefinitionCode = processDefinitionCode;
+    }
+
+    public int getProcessDefinitionVersion() {
+        return processDefinitionVersion;
+    }
+
+    public void setProcessDefinitionVersion(int processDefinitionVersion) {
+        this.processDefinitionVersion = processDefinitionVersion;
+    }
+
     @Override
     public String toString() {
         return "ProcessInstance{"
@@ -651,6 +681,12 @@ public class ProcessInstance {
                 + timeout
                 + ", tenantId="
                 + tenantId
+                + ", processDefinitionCode='"
+                + processDefinitionCode
+                + '\''
+                + ", processDefinitionVersion='"
+                + processDefinitionVersion
+                + '\''
                 + '}';
     }
 
@@ -672,4 +708,5 @@ public class ProcessInstance {
     public int hashCode() {
         return Objects.hash(id);
     }
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index b13ca87..e35cf8f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -58,7 +58,9 @@ public class TaskInstance implements Serializable {
 
     /**
      * process definition id
+     * TODO delete
      */
+    @TableField(exist = false)
     private int processDefinitionId;
 
     /**
@@ -67,6 +69,21 @@ public class TaskInstance implements Serializable {
     private int processInstanceId;
 
     /**
+     * task code
+     */
+    private long taskCode;
+
+    /**
+     *  process definition code
+     */
+    private long processDefinitionCode;
+
+    /**
+     * task defintion version
+     */
+    private String taskDefinitionVersion;
+
+    /**
      * process instance name
      */
     @TableField(exist = false)
@@ -74,7 +91,9 @@ public class TaskInstance implements Serializable {
 
     /**
      * task json
+     * TODO delete
      */
+    @TableField(exist = false)
     private String taskJson;
 
     /**
@@ -601,4 +620,28 @@ public class TaskInstance implements Serializable {
                 + ", delayTime=" + delayTime
                 + '}';
     }
+
+    public long getTaskCode() {
+        return taskCode;
+    }
+
+    public void setTaskCode(long taskCode) {
+        this.taskCode = taskCode;
+    }
+
+    public long getProcessDefinitionCode() {
+        return processDefinitionCode;
+    }
+
+    public void setProcessDefinitionCode(long processDefinitionCode) {
+        this.processDefinitionCode = processDefinitionCode;
+    }
+
+    public String getTaskDefinitionVersion() {
+        return taskDefinitionVersion;
+    }
+
+    public void setTaskDefinitionVersion(String taskDefinitionVersion) {
+        this.taskDefinitionVersion = taskDefinitionVersion;
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
index 80a426a..18c3b96 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -46,12 +46,19 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
     List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
 
     /**
+     * query max version for definition
+     * @param processDefinitionCode
+     * @return
+     */
+    int queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode);
+
+    /**
      * query the certain process definition version info by process definition code and version number
      *
      * @param processDefinitionCode process definition code
      * @param version version number
      * @return the process definition version info
      */
-    ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode, @Param("version") long version);
-
+    ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode,
+                                                         @Param("version") long version);
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
index 80cca29..60108fb 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
@@ -25,7 +25,6 @@
          warning_group_id, timeout, tenant_id,operator, operate_time, create_time,
           update_time
     </sql>
-
     <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
         select pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
         pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects,
@@ -37,15 +36,13 @@
         WHERE p.code = #{projectCode}
         and pd.name = #{processDefinitionName}
     </select>
-
     <select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
         select
         <include refid="baseSql"/>
         from t_ds_process_definition_log
         WHERE pd.code = #{processDefinitionCode}
     </select>
-
-    <select id="queryByProcessDefinitionCodeAndVersion"
+    <select id="queryByDefinitionCodeAndVersion"
             resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
         select
         <include refid="baseSql"/>
@@ -53,5 +50,10 @@
         where code = #{processDefinitionCode}
         and version = #{version}
     </select>
+    <select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
+        select max(version)
+        from t_ds_process_definition_log
+        where code = #{processDefinitionCode}
 
+    </select>
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f661635..39ccdeb 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -19,7 +19,7 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper">
     <sql id="baseSql">
-        id, name, process_definition_id, state, recovery, start_time, end_time, run_times,host,
+        id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
         command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
         warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag,
         update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times,
@@ -88,7 +88,7 @@
     </select>
 
     <select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
-        select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id,
+        select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id, instance.process_definition_version,
         instance.name, instance.state, instance.schedule_time, instance.start_time, instance.end_time,
         instance.run_times, instance.recovery, instance.host
         from t_ds_process_instance instance
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 71bd251..43cfe39 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -19,13 +19,13 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper">
     <sql id="baseSql">
-        id, name, task_type, process_definition_id, process_instance_id, task_json, state, submit_time,
+        id, name, task_type, process_instance_id, task_code, task_definition_version, process_definition_code, state, submit_time,
         start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
         flag, retry_interval, max_retry_times, task_instance_priority, worker_group, executor_id,
         first_submit_time, delay_time, var_pool
     </sql>
     <sql id="baseSqlV2">
-        ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.process_definition_id, ${alias}.process_instance_id, ${alias}.task_json, ${alias}.state, ${alias}.submit_time,
+        ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_definition_code, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
         ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
         ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id,
         ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool
@@ -72,7 +72,7 @@
     <select id="countTaskInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
         select state, count(0) as count
         from t_ds_task_instance t
-        left join t_ds_process_definition d on d.id=t.process_definition_id
+        left join t_ds_process_definition d on d.code=t.process_definition_code
         left join t_ds_project p on p.id=d.project_id
         where 1=1
         <if test="projectIds != null and projectIds.length != 0">
@@ -98,7 +98,7 @@
     <select id="countTask" resultType="java.lang.Integer">
         select count(1) as count
         from t_ds_task_instance task,t_ds_process_definition process
-        where task.process_definition_id=process.id
+        where task.process_definition_code=process.code
         <if test="projectIds != null and projectIds.length != 0">
             and process.project_id in
             <foreach collection="projectIds" index="index" item="i" open="(" separator="," close=")">
@@ -120,7 +120,7 @@
         ,
         process.name as process_instance_name
         from t_ds_task_instance instance
-        left join t_ds_process_definition define on instance.process_definition_id = define.id
+        left join t_ds_process_definition define on instance.process_definition_code = define.code
         left join t_ds_process_instance process on process.id=instance.process_instance_id
         where define.project_id = #{projectId}
         <if test="startTime != null">
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b9065ec..4acf6fe 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -35,13 +35,17 @@ import org.apache.dolphinscheduler.common.enums.CycleEnum;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.model.DateInterval;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
 import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -49,18 +53,22 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.CycleDependency;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
 import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.ProjectUser;
 import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@@ -68,12 +76,15 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
@@ -126,6 +137,11 @@ public class ProcessService {
     private ProcessDefinitionMapper processDefineMapper;
 
     @Autowired
+    private ProcessDefinitionLogMapper processDefineLogMapper;
+
+
+
+    @Autowired
     private ProcessInstanceMapper processInstanceMapper;
 
     @Autowired
@@ -158,6 +174,15 @@ public class ProcessService {
     @Autowired
     private ProjectMapper projectMapper;
 
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      *
@@ -340,6 +365,37 @@ public class ProcessService {
     }
 
     /**
+     * find process define by id.
+     *
+     * @param processDefinitionCode processDefinitionCode
+     * @return process definition
+     */
+    public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) {
+        ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
+        if (processDefinition.getVersion() != version) {
+            ProcessDefinitionLog log = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
+            processDefinition = convertFromLog(log);
+        }
+        return processDefinition;
+    }
+
+    /**
+     * covert log to process definition
+     * @param processDefinitionLog
+     * @return
+     */
+    public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) {
+        ProcessDefinition definition = null;
+        if (null != processDefinitionLog) {
+            definition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog), ProcessDefinition.class);
+        }
+        if (null != definition) {
+            definition.setId(0);
+        }
+        return definition;
+    }
+
+    /**
      * delete work process instance by id
      *
      * @param processInstanceId processInstanceId
@@ -2055,4 +2111,196 @@ public class ProcessService {
         }
         return JSONUtils.toJsonString(processData);
     }
+
+    /**
+     * switch process definition version to process definition log version
+     *
+     * @param processDefinition
+     * @param processDefinitionLog
+     * @return
+     */
+    public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
+        if (null == processDefinition || null == processDefinitionLog) {
+            return Constants.EXIT_CODE_FAILURE;
+        }
+
+        ProcessDefinition tmpDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog),
+                ProcessDefinition.class);
+        tmpDefinition.setId(processDefinition.getId());
+        tmpDefinition.setReleaseState(ReleaseState.OFFLINE);
+        tmpDefinition.setFlag(Flag.YES);
+
+        int switchResult = 0;
+        if (0 == processDefinition.getId()) {
+
+            switchResult = processDefineMapper.insert(tmpDefinition);
+        } else {
+            switchResult = processDefineMapper.updateById(tmpDefinition);
+        }
+        //TODO... switch task relations
+        return switchResult;
+    }
+
+    /**
+     * update task definition
+     *
+     * @param operator
+     * @param projectCode
+     * @param taskNode
+     * @param taskDefinition
+     * @return
+     */
+    public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
+
+        List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskDefinition.getCode());
+        int version = taskDefinitionLogs
+                .stream()
+                .map(TaskDefinitionLog::getVersion)
+                .max((x, y) -> x > y ? x : y)
+                .orElse(0) + 1;
+        Date now = new Date();
+        taskDefinition.setVersion(version);
+        taskDefinition.setCode(taskDefinition.getCode());
+        taskDefinition.setName(taskNode.getName());
+        taskDefinition.setDescription(taskNode.getDesc());
+        taskDefinition.setProjectCode(projectCode);
+        taskDefinition.setUserId(operator.getId());
+        taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
+        taskDefinition.setTaskParams(taskNode.getParams());
+        taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
+        taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
+        taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
+        taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
+        taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
+        taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
+        taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
+        taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
+        taskDefinition.setUpdateTime(now);
+        taskDefinition.setResourceIds(getResourceIds(taskDefinition));
+        int update = taskDefinitionMapper.updateById(taskDefinition);
+        // save task definition log
+        TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+        taskDefinitionLog.set(taskDefinition);
+        taskDefinitionLog.setOperator(operator.getId());
+        taskDefinitionLog.setOperateTime(now);
+        int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
+        return insert & update;
+    }
+
+    /**
+     * get resource ids
+     *
+     * @param taskDefinition taskDefinition
+     * @return resource ids
+     */
+    public String getResourceIds(TaskDefinition taskDefinition) {
+        Set<Integer> resourceIds = null;
+        // TODO modify taskDefinition.getTaskType()
+        AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams());
+
+        if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
+            resourceIds = params.getResourceFilesList().
+                    stream()
+                    .filter(t -> t.getId() != 0)
+                    .map(ResourceInfo::getId)
+                    .collect(Collectors.toSet());
+        }
+        if (CollectionUtils.isEmpty(resourceIds)) {
+            return StringUtils.EMPTY;
+        }
+        return StringUtils.join(resourceIds, ",");
+    }
+
+    /**
+     * @param operator
+     * @param name
+     * @param desc
+     * @param locations
+     * @param connects
+     * @param project
+     * @param processData
+     * @param processDefinition
+     * @return
+     */
+    public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
+                                     String connects, ProcessData processData, ProcessDefinition processDefinition) {
+        List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
+        for (TaskNode task : taskNodeList) {
+            // TODO update by code directly
+            TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(project.getCode(), task.getName());
+            updateTaskDefinition(operator, project.getCode(), task, taskDefinition);
+        }
+        createTaskAndRelation(operator, project.getName(), "", processDefinition, processData);
+        ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
+                name, processData, project, desc, locations, connects);
+        return switchVersion(processDefinition, processDefinitionLog);
+    }
+
+    /**
+     * @param operator
+     * @param processDefinitionCode
+     * @param processDefinitionName
+     * @param processData
+     * @param project
+     * @param desc
+     * @param locations
+     * @param connects
+     * @return
+     */
+    public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName,
+                                                           ProcessData processData, Project project,
+                                                           String desc, String locations, String connects) {
+        ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
+        int version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
+        processDefinitionLog.setCode(processDefinitionCode);
+        processDefinitionLog.setVersion(version);
+        processDefinitionLog.setName(processDefinitionName);
+        processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
+        processDefinitionLog.setProjectCode(project.getCode());
+        processDefinitionLog.setDescription(desc);
+        processDefinitionLog.setLocations(locations);
+        processDefinitionLog.setConnects(connects);
+        processDefinitionLog.setTimeout(processData.getTimeout());
+        processDefinitionLog.setTenantId(processData.getTenantId());
+        processDefinitionLog.setOperator(operator.getId());
+        Date updateTime = new Date();
+        processDefinitionLog.setOperateTime(updateTime);
+        processDefinitionLog.setUpdateTime(updateTime);
+
+        //custom global params
+        List<Property> globalParamsList = new ArrayList<>();
+        if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
+            Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
+            globalParamsList = new ArrayList<>(userDefParamsSet);
+        }
+        processDefinitionLog.setGlobalParamList(globalParamsList);
+        processDefinitionLog.setFlag(Flag.YES);
+        int insert = processDefinitionLogMapper.insert(processDefinitionLog);
+        if (insert > 0) {
+            return processDefinitionLog;
+        }
+        return null;
+    }
+
+    /**
+     * create task defintion and task relations
+     *
+     * @param loginUser
+     * @param projectName
+     * @param relationName
+     * @param processDefinition
+     * @param processData
+     * @return
+     */
+    public void createTaskAndRelation(User loginUser, String projectName, String relationName,
+                                      ProcessDefinition processDefinition,
+                                      ProcessData processData) {
+        List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
+        for (TaskNode task : taskNodeList) {
+            //TODO... task code exists, update task
+            //createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
+        }
+        // TODO:  query taskCode by  projectCode and taskName
+    }
+
 }
diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql
index 430f03a..d1439f6 100644
--- a/sql/dolphinscheduler-postgre.sql
+++ b/sql/dolphinscheduler-postgre.sql
@@ -674,10 +674,10 @@ CREATE TABLE t_ds_task_instance (
   id int NOT NULL  ,
   name varchar(255) DEFAULT NULL ,
   task_type varchar(64) DEFAULT NULL ,
+  task_code bigint NOT NULL,
   task_definition_version int DEFAULT NULL ,
   process_definition_code bigint DEFAULT NULL ,
   process_instance_id int DEFAULT NULL ,
-  task_json text ,
   state int DEFAULT NULL ,
   submit_time timestamp DEFAULT NULL ,
   start_time timestamp DEFAULT NULL ,
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index bdc5dd8..a9d4752 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -810,11 +810,11 @@ DROP TABLE IF EXISTS `t_ds_task_instance`;
 CREATE TABLE `t_ds_task_instance` (
   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
   `name` varchar(255) DEFAULT NULL COMMENT 'task name',
-  `task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
   `task_type` varchar(64) DEFAULT NULL COMMENT 'task type',
+  `task_code` bigint(20) NOT NULL COMMENT 'task definition code',
+  `task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
   `process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
   `process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',
-  `task_json` longtext COMMENT 'task content json',
   `state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
   `submit_time` datetime DEFAULT NULL COMMENT 'task submit time',
   `start_time` datetime DEFAULT NULL COMMENT 'task start time',