You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/02/22 02:13:41 UTC

[incubator-dolphinscheduler] branch json_split updated: [Feature][JsonSplit]refactor remove the json in process instance and definition (#4828)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new 62e961e  [Feature][JsonSplit]refactor remove the json in process instance and definition (#4828)
62e961e is described below

commit 62e961e3f1bab666781df982d2284720db434461
Author: bao liang <29...@users.noreply.github.com>
AuthorDate: Mon Feb 22 10:13:32 2021 +0800

    [Feature][JsonSplit]refactor remove the json in process instance and definition (#4828)
    
    * #4417 [JsonSplit] refactor process definition json
    
    * [#4177] refactor json split. remove json in process definition and process instance
    
    * [#4177] refactor json split. copy process definition need set task code 0L.
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * update
    
    * update
---
 .../service/impl/ProcessDefinitionServiceImpl.java |   8 +-
 .../impl/ProcessDefinitionVersionServiceImpl.java  |   1 -
 .../service/impl/ProcessInstanceServiceImpl.java   |   6 -
 .../impl/ProcessTaskRelationServiceImpl.java       |   2 +-
 .../service/impl/TaskDefinitionServiceImpl.java    |   3 +-
 .../dolphinscheduler/common/model/TaskNode.java    | 665 +++++++++++----------
 .../dolphinscheduler/dao/utils/DagHelper.java      |  52 +-
 .../server/master/runner/MasterExecThread.java     |  17 +-
 .../service/process/ProcessService.java            | 118 +---
 .../service/process/ProcessServiceTest.java        | 104 ----
 10 files changed, 406 insertions(+), 570 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 2b9e7a3..f7ddf06 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -427,9 +427,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 return result;
             }
         }
-        // get the processdefinitionjson before saving,and then save the name and taskid
-        String oldJson = processDefinition.getProcessDefinitionJson();
-        processDefinitionJson = processService.changeJson(processData, oldJson);
         ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
         int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
                 locations, connects, newProcessData, processDefinition);
@@ -1452,6 +1449,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
             return result;
         } else {
+            ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), ProcessData.class);
+            List<TaskNode> taskNodeList = processData.getTasks();
+            taskNodeList.stream().forEach(taskNode -> {
+                taskNode.setCode(0L);
+            });
             return createProcessDefinition(
                     loginUser,
                     targetProject.getName(),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
index 64dcd87..0764810 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
@@ -71,7 +71,6 @@ public class ProcessDefinitionVersionServiceImpl extends BaseServiceImpl impleme
                 .newBuilder()
                 .processDefinitionId(processDefinition.getId())
                 .version(version)
-                .processDefinitionJson(processDefinition.getProcessDefinitionJson())
                 .description(processDefinition.getDescription())
                 .locations(processDefinition.getLocations())
                 .connects(processDefinition.getConnects())
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index aeb23df..905b245 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -457,11 +457,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         }
         Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
                 processDefinition.getUserId());
-        // get the processinstancejson before saving,and then save the name and taskid
-        String oldJson = processInstance.getProcessInstanceJson();
-        if (StringUtils.isNotEmpty(oldJson)) {
-            processInstanceJson = processService.changeJson(processData, oldJson);
-        }
         setProcessInstance(processInstance, tenant, scheduleTime, locations,
                 connects, processInstanceJson, processData);
         int update = processService.updateProcessInstance(processInstance);
@@ -530,7 +525,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         processInstance.setProcessInstanceJson(processInstanceJson);
         processInstance.setGlobalParams(globalParams);
     }
-
     /**
      * query parent process instance detail info by sub process instance id
      *
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 5ea01bd..e57ef00 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Service;
  * task definition service impl
  */
 @Service
-public class ProcessTaskRelationServiceImpl extends BaseService implements
+public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements
         ProcessTaskRelationService {
 
     private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 8b1e672..8e32133 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -56,8 +56,7 @@ import org.springframework.transaction.annotation.Transactional;
  * task definition service impl
  */
 @Service
-public class TaskDefinitionServiceImpl extends BaseService implements
-        TaskDefinitionService {
+public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDefinitionService {
 
     private static final Logger logger = LoggerFactory.getLogger(TaskDefinitionServiceImpl.class);
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index cd3e573..0072b93 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -23,9 +23,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.*;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 
 import java.io.IOException;
@@ -35,330 +37,299 @@ import java.util.Objects;
 
 public class TaskNode {
 
-  /**
-   * task node id
-   */
-  private String id;
-
-  /**
-   * task node name
-   */
-  private String name;
-
-  /**
-   * task node description
-   */
-  private String desc;
-
-  /**
-   * task node type
-   */
-  private String type;
-
-  /**
-   * the run flag has two states, NORMAL or FORBIDDEN
-   */
-  private String runFlag;
-
-  /**
-   * the front field
-   */
-  private String loc;
-
-  /**
-   * maximum number of retries
-   */
-  private int maxRetryTimes;
-
-  /**
-   * Unit of retry interval: points
-   */
-  private int retryInterval;
-
-  /**
-   * params information
-   */
-  @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
-  @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
-  private String params;
-
-  /**
-   * inner dependency information
-   */
-  @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
-  @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
-  private String preTasks;
-
-  /**
-   * users store additional information
-   */
-  @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
-  @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
-  private String extras;
-
-  /**
-   * node dependency list
-   */
-  private List<String> depList;
-
-  /**
-   * outer dependency information
-   */
-  @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
-  @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
-  private String dependence;
-
-
-  @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
-  @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
-  private String conditionResult;
-
-  /**
-   *  task instance priority
-   */
-  private Priority taskInstancePriority;
-
-  /**
-   * worker group
-   */
-  private String workerGroup;
-
-  /**
-   * worker group id
-   */
-  private Integer workerGroupId;
-
-
-  /**
-   * task time out
-   */
-  @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
-  @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
-  private String timeout;
+    /**
+     * task node id
+     */
+    private String id;
+
+    /**
+     * task node code
+     */
+    private Long code;
+
+    /**
+     * task node version
+     */
+    private int version;
+
+    /**
+     * task node name
+     */
+    private String name;
+
+    /**
+     * task node description
+     */
+    private String desc;
+
+    /**
+     * task node type
+     */
+    private String type;
+
+    /**
+     * the run flag has two states, NORMAL or FORBIDDEN
+     */
+    private String runFlag;
+
+    /**
+     * the front field
+     */
+    private String loc;
+
+    /**
+     * maximum number of retries
+     */
+    private int maxRetryTimes;
+
+    /**
+     * Unit of retry interval: points
+     */
+    private int retryInterval;
+
+    /**
+     * params information
+     */
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+    private String params;
+
+    /**
+     * inner dependency information
+     */
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+    private String preTasks;
+
+    /**
+     * users store additional information
+     */
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+    private String extras;
+
+    /**
+     * node dependency list
+     */
+    private List<String> depList;
+
+    /**
+     * outer dependency information
+     */
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+    private String dependence;
+
+
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+    private String conditionResult;
+
+    /**
+     * task instance priority
+     */
+    private Priority taskInstancePriority;
+
+    /**
+     * worker group
+     */
+    private String workerGroup;
+
+    /**
+     * worker group id
+     */
+    private Integer workerGroupId;
+
+
+    /**
+     * task time out
+     */
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+    private String timeout;
 
     /**
      * delay execution time.
      */
     private int delayTime;
 
-  public String getId() {
-    return id;
-  }
-
-  public void setId(String id) {
-    this.id = id;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  public String getDesc() {
-    return desc;
-  }
-
-  public void setDesc(String desc) {
-    this.desc = desc;
-  }
-
-  public String getType() {
-    return type;
-  }
-
-  public void setType(String type) {
-    this.type = type;
-  }
-
-  public String getParams() {
-    return params;
-  }
-
-  public void setParams(String params) {
-    this.params = params;
-  }
-
-  public String getPreTasks() {
-    return preTasks;
-  }
-
-  public void setPreTasks(String preTasks) throws IOException {
-    this.preTasks = preTasks;
-    this.depList = JSONUtils.toList(preTasks, String.class);
-  }
-
-  public String getExtras() {
-    return extras;
-  }
-
-  public void setExtras(String extras) {
-    this.extras = extras;
-  }
-
-  public List<String> getDepList() {
-    return depList;
-  }
-
-  public void setDepList(List<String> depList) throws JsonProcessingException {
-    this.depList = depList;
-    this.preTasks = JSONUtils.toJsonString(depList);
-  }
-
-  public String getLoc() {
-    return loc;
-  }
-
-  public void setLoc(String loc) {
-    this.loc = loc;
-  }
-
-  public String getRunFlag(){
-    return runFlag;
-  }
-
-  public void setRunFlag(String runFlag) {
-    this.runFlag = runFlag;
-  }
-
-  public Boolean isForbidden(){
-    return (StringUtils.isNotEmpty(this.runFlag) &&
-            this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-        return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-        return false;
-    }
-    TaskNode taskNode = (TaskNode) o;
-    return  Objects.equals(name, taskNode.name) &&
-            Objects.equals(desc, taskNode.desc) &&
-            Objects.equals(type, taskNode.type) &&
-            Objects.equals(params, taskNode.params) &&
-            Objects.equals(preTasks, taskNode.preTasks) &&
-            Objects.equals(extras, taskNode.extras) &&
-            Objects.equals(runFlag, taskNode.runFlag) &&
-            Objects.equals(dependence, taskNode.dependence) &&
-            Objects.equals(workerGroup, taskNode.workerGroup) &&
-            Objects.equals(conditionResult, taskNode.conditionResult) &&
-
-            CollectionUtils.equalLists(depList, taskNode.depList);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag);
-  }
-
-  public String getDependence() {
-    return dependence;
-  }
-
-  public void setDependence(String dependence) {
-    this.dependence = dependence;
-  }
-
-  public int getMaxRetryTimes() {
-    return maxRetryTimes;
-  }
-
-  public void setMaxRetryTimes(int maxRetryTimes) {
-    this.maxRetryTimes = maxRetryTimes;
-  }
-
-  public int getRetryInterval() {
-    return retryInterval;
-  }
-
-  public void setRetryInterval(int retryInterval) {
-    this.retryInterval = retryInterval;
-  }
-
-  public Priority getTaskInstancePriority() {
-    return taskInstancePriority;
-  }
-
-  public void setTaskInstancePriority(Priority taskInstancePriority) {
-    this.taskInstancePriority = taskInstancePriority;
-  }
-
-  public String getTimeout() {
-    return timeout;
-  }
-
-  public void setTimeout(String timeout) {
-    this.timeout = timeout;
-  }
-
-  /**
-   * get task time out parameter
-   * @return task time out parameter
-   */
-  public TaskTimeoutParameter getTaskTimeoutParameter() {
-    if(StringUtils.isNotEmpty(this.getTimeout())){
-      String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name());
-      String taskTimeout = this.getTimeout().replace(formatStr,TaskTimeoutStrategy.WARNFAILED.name());
-      return JSONUtils.parseObject(taskTimeout,TaskTimeoutParameter.class);
-    }
-    return new TaskTimeoutParameter(false);
-  }
-
-  public boolean isConditionsTask(){
-    return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
-  }
-
-  @Override
-  public String toString() {
-        return "TaskNode{"
-            + "id='" + id + '\''
-            + ", name='" + name + '\''
-            + ", desc='" + desc + '\''
-            + ", type='" + type + '\''
-            + ", runFlag='" + runFlag + '\''
-            + ", loc='" + loc + '\''
-            + ", maxRetryTimes=" + maxRetryTimes
-            + ", retryInterval=" + retryInterval
-            + ", params='" + params + '\''
-            + ", preTasks='" + preTasks + '\''
-            + ", extras='" + extras + '\''
-            + ", depList=" + depList
-            + ", dependence='" + dependence + '\''
-            + ", taskInstancePriority=" + taskInstancePriority
-            + ", timeout='" + timeout + '\''
-            + ", workerGroup='" + workerGroup + '\''
-            + ", delayTime=" + delayTime
-            + '}';
-  }
-
-  public String getWorkerGroup() {
-    return workerGroup;
-  }
-
-  public void setWorkerGroup(String workerGroup) {
-    this.workerGroup = workerGroup;
-  }
-
-  public String getConditionResult() {
-    return conditionResult;
-  }
-
-  public void setConditionResult(String conditionResult) {
-    this.conditionResult = conditionResult;
-  }
-
-  public Integer getWorkerGroupId() {
-    return workerGroupId;
-  }
-
-  public void setWorkerGroupId(Integer workerGroupId) {
-    this.workerGroupId = workerGroupId;
-  }
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public void setDesc(String desc) {
+        this.desc = desc;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getParams() {
+        return params;
+    }
+
+    public void setParams(String params) {
+        this.params = params;
+    }
+
+    public String getPreTasks() {
+        return preTasks;
+    }
+
+    public void setPreTasks(String preTasks) throws IOException {
+        this.preTasks = preTasks;
+        this.depList = JSONUtils.toList(preTasks, String.class);
+    }
+
+    public String getExtras() {
+        return extras;
+    }
+
+    public void setExtras(String extras) {
+        this.extras = extras;
+    }
+
+    public List<String> getDepList() {
+        return depList;
+    }
+
+    public void setDepList(List<String> depList) throws JsonProcessingException {
+        this.depList = depList;
+        this.preTasks = JSONUtils.toJsonString(depList);
+    }
+
+    public String getLoc() {
+        return loc;
+    }
+
+    public void setLoc(String loc) {
+        this.loc = loc;
+    }
+
+    public String getRunFlag() {
+        return runFlag;
+    }
+
+    public void setRunFlag(String runFlag) {
+        this.runFlag = runFlag;
+    }
+
+    public Boolean isForbidden() {
+        return (StringUtils.isNotEmpty(this.runFlag)
+                && this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TaskNode taskNode = (TaskNode) o;
+        return Objects.equals(name, taskNode.name)
+                && Objects.equals(desc, taskNode.desc)
+                && Objects.equals(type, taskNode.type)
+                && Objects.equals(params, taskNode.params)
+                && Objects.equals(preTasks, taskNode.preTasks)
+                && Objects.equals(extras, taskNode.extras)
+                && Objects.equals(runFlag, taskNode.runFlag)
+                && Objects.equals(dependence, taskNode.dependence)
+                && Objects.equals(workerGroup, taskNode.workerGroup)
+                && Objects.equals(conditionResult, taskNode.conditionResult)
+                && CollectionUtils.equalLists(depList, taskNode.depList);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag);
+    }
+
+    public String getDependence() {
+        return dependence;
+    }
+
+    public void setDependence(String dependence) {
+        this.dependence = dependence;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public void setMaxRetryTimes(int maxRetryTimes) {
+        this.maxRetryTimes = maxRetryTimes;
+    }
+
+    public int getRetryInterval() {
+        return retryInterval;
+    }
+
+    public void setRetryInterval(int retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+
+    public Priority getTaskInstancePriority() {
+        return taskInstancePriority;
+    }
+
+    public void setTaskInstancePriority(Priority taskInstancePriority) {
+        this.taskInstancePriority = taskInstancePriority;
+    }
+
+    public String getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(String timeout) {
+        this.timeout = timeout;
+    }
+
+    public String getWorkerGroup() {
+        return workerGroup;
+    }
+
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
+    }
+
+    public String getConditionResult() {
+        return conditionResult;
+    }
+
+    public void setConditionResult(String conditionResult) {
+        this.conditionResult = conditionResult;
+    }
+
+    public Integer getWorkerGroupId() {
+        return workerGroupId;
+    }
+
+    public void setWorkerGroupId(Integer workerGroupId) {
+        this.workerGroupId = workerGroupId;
+    }
 
     public int getDelayTime() {
         return delayTime;
@@ -367,4 +338,62 @@ public class TaskNode {
     public void setDelayTime(int delayTime) {
         this.delayTime = delayTime;
     }
+
+    public Long getCode() {
+        return code;
+    }
+
+    public void setCode(Long code) {
+        this.code = code;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    /**
+     * get task time out parameter
+     *
+     * @return task time out parameter
+     */
+    public TaskTimeoutParameter getTaskTimeoutParameter() {
+        if (StringUtils.isNotEmpty(this.getTimeout())) {
+            String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name());
+            String taskTimeout = this.getTimeout().replace(formatStr, TaskTimeoutStrategy.WARNFAILED.name());
+            return JSONUtils.parseObject(taskTimeout, TaskTimeoutParameter.class);
+        }
+        return new TaskTimeoutParameter(false);
+    }
+
+    public boolean isConditionsTask() {
+        return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
+    }
+
+    @Override
+    public String toString() {
+        return "TaskNode{"
+                + "id='" + id + '\''
+                + ", name='" + name + '\''
+                + ", desc='" + desc + '\''
+                + ", type='" + type + '\''
+                + ", runFlag='" + runFlag + '\''
+                + ", loc='" + loc + '\''
+                + ", maxRetryTimes=" + maxRetryTimes
+                + ", retryInterval=" + retryInterval
+                + ", params='" + params + '\''
+                + ", preTasks='" + preTasks + '\''
+                + ", extras='" + extras + '\''
+                + ", depList=" + depList
+                + ", dependence='" + dependence + '\''
+                + ", taskInstancePriority=" + taskInstancePriority
+                + ", timeout='" + timeout + '\''
+                + ", workerGroup='" + workerGroup + '\''
+                + ", delayTime=" + delayTime
+                + '}';
+    }
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index f7eaabc..970fc47 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -25,17 +25,13 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
 import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * dag tools
@@ -193,24 +189,19 @@ public class DagHelper {
     /**
      * generate dag by start nodes and recovery nodes
      *
-     * @param processDefinitionJson processDefinitionJson
+     * @param totalTaskNodeList     totalTaskNodeList
      * @param startNodeNameList     startNodeNameList
      * @param recoveryNodeNameList  recoveryNodeNameList
      * @param depNodeType           depNodeType
      * @return process dag
      * @throws Exception if error throws Exception
      */
-    public static ProcessDag generateFlowDag(String processDefinitionJson,
+    public static ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
                                              List<String> startNodeNameList,
                                              List<String> recoveryNodeNameList,
                                              TaskDependType depNodeType) throws Exception {
-        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
 
-        List<TaskNode> taskNodeList = new ArrayList<>();
-        if (null != processData) {
-            taskNodeList = processData.getTasks();
-        }
-        List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
+        List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
         if (destTaskNodeList.isEmpty()) {
             return null;
         }
@@ -222,29 +213,6 @@ public class DagHelper {
     }
 
     /**
-     * parse the forbidden task nodes in process definition.
-     *
-     * @param processDefinitionJson processDefinitionJson
-     * @return task node map
-     */
-    public static Map<String, TaskNode> getForbiddenTaskNodeMaps(String processDefinitionJson) {
-        Map<String, TaskNode> forbidTaskNodeMap = new ConcurrentHashMap<>();
-        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
-        List<TaskNode> taskNodeList = new ArrayList<>();
-        if (null != processData) {
-            taskNodeList = processData.getTasks();
-        }
-        for (TaskNode node : taskNodeList) {
-            if (node.isForbidden()) {
-                forbidTaskNodeMap.putIfAbsent(node.getName(), node);
-            }
-        }
-        return forbidTaskNodeMap;
-    }
-
-
-    /**
      * find node by node name
      *
      * @param nodeDetails nodeDetails
@@ -470,18 +438,16 @@ public class DagHelper {
     /**
      * get process dag
      *
-     * @param taskDefinitions task definition
+     * @param taskNodeList task node list
      * @return Process dag
      */
-    public static ProcessDag getProcessDag(List<TaskDefinition> taskDefinitions,
+    public static ProcessDag getProcessDag(List<TaskNode> taskNodeList,
                                            List<ProcessTaskRelation> processTaskRelations) {
         Map<Long, TaskNode> taskNodeMap = new HashMap<>();
-        List<TaskNode> taskNodeList = new ArrayList<>();
-        for (TaskDefinition taskDefinition : taskDefinitions) {
-            TaskNode taskNode = JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinition), TaskNode.class);
-            taskNodeMap.put(taskDefinition.getCode(), taskNode);
-            taskNodeList.add(taskNode);
-        }
+
+        taskNodeList.stream().forEach(taskNode -> {
+            taskNodeMap.putIfAbsent(taskNode.getCode(), taskNode);
+        });
 
         List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
         for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 3b113b6..ff44842 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -383,12 +383,17 @@ public class MasterExecThread implements Runnable {
      */
     private void buildFlowDag() throws Exception {
         recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
-
-        forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
+        List<TaskNode> taskNodeList = processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
+        forbiddenTaskList.clear();
+        taskNodeList.stream().forEach(taskNode -> {
+            if (taskNode.isForbidden()) {
+                forbiddenTaskList.put(taskNode.getName(), taskNode);
+            }
+        });
         // generate process to get DAG info
         List<String> recoveryNameList = getRecoveryNodeNameList();
         List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
-        ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
+        ProcessDag processDag = generateFlowDag(taskNodeList,
                 startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
         if (processDag == null) {
             logger.error("processDag is null");
@@ -1229,17 +1234,17 @@ public class MasterExecThread implements Runnable {
     /**
      * generate flow dag
      *
-     * @param processDefinitionJson process definition json
+     * @param totalTaskNodeList     total task node list
      * @param startNodeNameList     start node name list
      * @param recoveryNodeNameList  recovery node name list
      * @param depNodeType           depend node type
      * @return ProcessDag           process dag
      * @throws Exception exception
      */
-    public ProcessDag generateFlowDag(String processDefinitionJson,
+    public ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
                                       List<String> startNodeNameList,
                                       List<String> recoveryNodeNameList,
                                       TaskDependType depNodeType) throws Exception {
-        return DagHelper.generateFlowDag(processDefinitionJson, startNodeNameList, recoveryNodeNameList, depNodeType);
+        return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 472f98a..cc82844 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -341,20 +341,22 @@ public class ProcessService {
     public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
         ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
         if (processDefinition == null) {
-            logger.info("process define not exists");
-            return null;
-        }
-
-        String processDefinitionJson = processDefinition.getProcessDefinitionJson();
-        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
-        //process data check
-        if (null == processData) {
-            logger.error("process data is null");
+            logger.error("process define not exists");
             return new ArrayList<>();
         }
 
-        return processData.getTasks();
+        List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
+        Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+            if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) {
+                TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(processTaskRelation.getPostTaskCode());
+                taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition);
+            }
+        }
+        return taskDefinitionMap.entrySet()
+                .stream()
+                .map(e -> JSONUtils.parseObject(JSONUtils.toJsonString(e.getValue()), TaskNode.class))
+                .collect(Collectors.toList());
     }
 
     /**
@@ -495,13 +497,7 @@ public class ProcessService {
      * @param ids ids
      */
     public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
-        ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
-        String processDefinitionJson = processDefinition.getProcessDefinitionJson();
-
-        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
-        List<TaskNode> taskNodeList = processData.getTasks();
-
+        List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
         if (taskNodeList != null && !taskNodeList.isEmpty()) {
 
             for (TaskNode taskNode : taskNodeList) {
@@ -650,8 +646,6 @@ public class ProcessService {
                 getCommandTypeIfComplement(processInstance, command),
                 processInstance.getScheduleTime()));
 
-        //copy process define json to process instance
-        processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
         // set process instance priority
         processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
         String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup();
@@ -2064,60 +2058,6 @@ public class ProcessService {
                 taskInstance.getId());
     }
 
-    /**
-     * solve the branch rename bug
-     *
-     * @return String
-     */
-    public String changeJson(ProcessData processData, String oldJson) {
-        ProcessData oldProcessData = JSONUtils.parseObject(oldJson, ProcessData.class);
-        HashMap<String, String> oldNameTaskId = new HashMap<>();
-        List<TaskNode> oldTasks = oldProcessData.getTasks();
-        for (int i = 0; i < oldTasks.size(); i++) {
-            TaskNode taskNode = oldTasks.get(i);
-            String oldName = taskNode.getName();
-            String oldId = taskNode.getId();
-            oldNameTaskId.put(oldName, oldId);
-        }
-
-        // take the processdefinitionjson saved this time, and then save the taskid and name
-        HashMap<String, String> newNameTaskId = new HashMap<>();
-        List<TaskNode> newTasks = processData.getTasks();
-        for (int i = 0; i < newTasks.size(); i++) {
-            TaskNode taskNode = newTasks.get(i);
-            String newId = taskNode.getId();
-            String newName = taskNode.getName();
-            newNameTaskId.put(newId, newName);
-        }
-
-        // replace the previous conditionresult with a new one
-        List<TaskNode> tasks = processData.getTasks();
-        for (int i = 0; i < tasks.size(); i++) {
-            TaskNode taskNode = newTasks.get(i);
-            String type = taskNode.getType();
-            if (TaskType.CONDITIONS.getDescp().equalsIgnoreCase(type)) {
-                ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
-                String oldSuccessNodeName = conditionsParameters.getSuccessNode().get(0);
-                String oldFailedNodeName = conditionsParameters.getFailedNode().get(0);
-                String newSuccessNodeName = newNameTaskId.get(oldNameTaskId.get(oldSuccessNodeName));
-                String newFailedNodeName = newNameTaskId.get(oldNameTaskId.get(oldFailedNodeName));
-                if (newSuccessNodeName != null) {
-                    ArrayList<String> successNode = new ArrayList<>();
-                    successNode.add(newSuccessNodeName);
-                    conditionsParameters.setSuccessNode(successNode);
-                }
-                if (newFailedNodeName != null) {
-                    ArrayList<String> failedNode = new ArrayList<>();
-                    failedNode.add(newFailedNodeName);
-                    conditionsParameters.setFailedNode(failedNode);
-                }
-                String conditionResultStr = conditionsParameters.getConditionResult();
-                taskNode.setConditionResult(conditionResultStr);
-                tasks.set(i, taskNode);
-            }
-        }
-        return JSONUtils.toJsonString(processData);
-    }
 
     /**
      * switch process definition version to process definition log version
@@ -2391,24 +2331,30 @@ public class ProcessService {
      * @return dag graph
      */
     public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
+        List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(processDefinition.getId());
+        List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
+        ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations);
+        // Generate concrete Dag to be executed
+        return DagHelper.buildDagGraph(processDag);
+    }
 
+    /**
+     * get process task relation list
+     * this function can be query relation list from log record
+     *
+     * @param processCode
+     * @param processVersion
+     * @return
+     */
+    public List<ProcessTaskRelation> getProcessTaskRelationList(Long processCode, int processVersion) {
         List<ProcessTaskRelationLog> taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
-                processDefinition.getCode(),
-                processDefinition.getVersion());
+                processCode,
+                processVersion);
         List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
-        List<TaskDefinition> taskDefinitions = new ArrayList<>();
         for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) {
             processTaskRelations.add(JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class));
-
-            TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
-                    processTaskRelationLog.getPostTaskCode(),
-                    processTaskRelationLog.getPostNodeVersion());
-            taskDefinitions.add(JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class));
         }
-
-        ProcessDag processDag = DagHelper.getProcessDag(taskDefinitions, processTaskRelations);
-        // Generate concrete Dag to be executed
-        return DagHelper.buildDagGraph(processDag);
+        return processTaskRelations;
     }
 
 }
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index b6d518c..cfa8caa 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -338,108 +338,4 @@ public class ProcessServiceTest {
         processService.recurseFindSubProcessId(parentId, ids);
 
     }
-
-    @Test
-    public void testChangeJson() {
-
-        ProcessData oldProcessData = new ProcessData();
-        ConditionsParameters conditionsParameters = new ConditionsParameters();
-        ArrayList<TaskNode> tasks = new ArrayList<>();
-        TaskNode taskNode = new TaskNode();
-        TaskNode taskNode11 = new TaskNode();
-        TaskNode taskNode111 = new TaskNode();
-        ArrayList<String> successNode = new ArrayList<>();
-        ArrayList<String> faildNode = new ArrayList<>();
-
-        taskNode.setName("bbb");
-        taskNode.setType("SHELL");
-        taskNode.setId("222");
-
-        taskNode11.setName("vvv");
-        taskNode11.setType("CONDITIONS");
-        taskNode11.setId("444");
-        successNode.add("bbb");
-        faildNode.add("ccc");
-
-        taskNode111.setName("ccc");
-        taskNode111.setType("SHELL");
-        taskNode111.setId("333");
-
-        conditionsParameters.setSuccessNode(successNode);
-        conditionsParameters.setFailedNode(faildNode);
-        taskNode11.setConditionResult(conditionsParameters.getConditionResult());
-        tasks.add(taskNode);
-        tasks.add(taskNode11);
-        tasks.add(taskNode111);
-        oldProcessData.setTasks(tasks);
-
-        ProcessData newProcessData = new ProcessData();
-        ConditionsParameters conditionsParameters2 = new ConditionsParameters();
-        TaskNode taskNode2 = new TaskNode();
-        TaskNode taskNode22 = new TaskNode();
-        TaskNode taskNode222 = new TaskNode();
-        ArrayList<TaskNode> tasks2 = new ArrayList<>();
-        ArrayList<String> successNode2 = new ArrayList<>();
-        ArrayList<String> faildNode2 = new ArrayList<>();
-
-        taskNode2.setName("bbbchange");
-        taskNode2.setType("SHELL");
-        taskNode2.setId("222");
-
-        taskNode22.setName("vv");
-        taskNode22.setType("CONDITIONS");
-        taskNode22.setId("444");
-        successNode2.add("bbb");
-        faildNode2.add("ccc");
-
-        taskNode222.setName("ccc");
-        taskNode222.setType("SHELL");
-        taskNode222.setId("333");
-
-        conditionsParameters2.setSuccessNode(successNode2);
-        conditionsParameters2.setFailedNode(faildNode2);
-        taskNode22.setConditionResult(conditionsParameters2.getConditionResult());
-        tasks2.add(taskNode2);
-        tasks2.add(taskNode22);
-        tasks2.add(taskNode222);
-
-        newProcessData.setTasks(tasks2);
-
-        ProcessData exceptProcessData = new ProcessData();
-        ConditionsParameters conditionsParameters3 = new ConditionsParameters();
-        TaskNode taskNode3 = new TaskNode();
-        TaskNode taskNode33 = new TaskNode();
-        TaskNode taskNode333 = new TaskNode();
-        ArrayList<TaskNode> tasks3 = new ArrayList<>();
-        ArrayList<String> successNode3 = new ArrayList<>();
-        ArrayList<String> faildNode3 = new ArrayList<>();
-
-        taskNode3.setName("bbbchange");
-        taskNode3.setType("SHELL");
-        taskNode3.setId("222");
-
-        taskNode33.setName("vv");
-        taskNode33.setType("CONDITIONS");
-        taskNode33.setId("444");
-        successNode3.add("bbbchange");
-        faildNode3.add("ccc");
-
-        taskNode333.setName("ccc");
-        taskNode333.setType("SHELL");
-        taskNode333.setId("333");
-
-        conditionsParameters3.setSuccessNode(successNode3);
-        conditionsParameters3.setFailedNode(faildNode3);
-        taskNode33.setConditionResult(conditionsParameters3.getConditionResult());
-        tasks3.add(taskNode3);
-        tasks3.add(taskNode33);
-        tasks3.add(taskNode333);
-        exceptProcessData.setTasks(tasks3);
-
-        String expect = JSONUtils.toJsonString(exceptProcessData);
-        String oldJson = JSONUtils.toJsonString(oldProcessData);
-
-        Assert.assertEquals(expect, processService.changeJson(newProcessData, oldJson));
-
-    }
 }