You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/08/03 03:54:47 UTC

[dolphinscheduler] branch json_split_two updated: [Fix-5519] The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code (#5937)

This is an automated email from the ASF dual-hosted git repository.

wenhemin pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split_two by this push:
     new 92b92b5  [Fix-5519] The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code (#5937)
92b92b5 is described below

commit 92b92b5f5a6d5202f031c781001a14725fb6219e
Author: wen-hemin <39...@users.noreply.github.com>
AuthorDate: Tue Aug 3 11:54:39 2021 +0800

    [Fix-5519] The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code (#5937)
    
    * fix: the t_ds_schedules table, process_definition_id change to process_definition_code
    
    * fix checkstyle
    
    * fix: recovery code
    
    * fix UT
    
    * fix: The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code
    
    * fix comment
    
    * fix checkstyle
    
    * fix: remove duplacated lines
    
    * fix: remove TODO
    
    Co-authored-by: wen-hemin <we...@apache.com>
---
 .../api/service/impl/ExecutorServiceImpl.java      | 26 +++---
 .../dolphinscheduler/dao/entity/Command.java       | 71 ++++++++--------
 .../dolphinscheduler/dao/entity/ErrorCommand.java  | 99 +++++++++-------------
 .../dolphinscheduler/dao/mapper/CommandMapper.xml  |  6 +-
 .../dao/mapper/ErrorCommandMapper.xml              |  4 +-
 .../dao/mapper/CommandMapperTest.java              | 73 ++++++++--------
 .../dao/mapper/ErrorCommandMapperTest.java         | 11 +--
 .../server/master/MasterCommandTest.java           | 22 ++---
 .../service/process/ProcessService.java            | 72 +++++++++++-----
 .../service/quartz/ProcessScheduleJob.java         |  2 +-
 .../service/process/ProcessServiceTest.java        | 60 ++++++++-----
 sql/dolphinscheduler_mysql.sql                     |  4 +-
 sql/dolphinscheduler_postgre.sql                   |  4 +-
 13 files changed, 243 insertions(+), 211 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 8978257..c3b422f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         /**
          * create command
          */
-        int create = this.createCommand(commandType, processDefinition.getId(),
+        int create = this.createCommand(commandType, processDefinition.getCode(),
                 taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
                 warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
 
@@ -276,13 +276,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
 
         switch (executeType) {
             case REPEAT_RUNNING:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING, startParams);
+                result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams);
                 break;
             case RECOVER_SUSPENDED_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
+                result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
                 break;
             case START_FAILURE_TASK_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
+                result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
                 break;
             case STOP:
                 if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@@ -394,11 +394,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      *
      * @param loginUser           login user
      * @param instanceId          instance id
-     * @param processDefinitionId process definition id
+     * @param processDefinitionCode process definition code
      * @param commandType         command type
      * @return insert result code
      */
-    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) {
+    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) {
         Map<String, Object> result = new HashMap<>();
 
         //To add startParams only when repeat running is needed
@@ -410,12 +410,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
 
         Command command = new Command();
         command.setCommandType(commandType);
-        command.setProcessDefinitionId(processDefinitionId);
+        command.setProcessDefinitionCode(processDefinitionCode);
         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
         command.setExecutorId(loginUser.getId());
 
         if (!processService.verifyIsNeedCreateCommand(command)) {
-            putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId);
+            putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode);
             return result;
         }
 
@@ -475,7 +475,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * create command
      *
      * @param commandType commandType
-     * @param processDefineId processDefineId
+     * @param processDefineCode processDefineCode
      * @param nodeDep nodeDep
      * @param failureStrategy failureStrategy
      * @param startNodeList startNodeList
@@ -488,7 +488,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @param workerGroup workerGroup
      * @return command id
      */
-    private int createCommand(CommandType commandType, int processDefineId,
+    private int createCommand(CommandType commandType, long processDefineCode,
                               TaskDependType nodeDep, FailureStrategy failureStrategy,
                               String startNodeList, String schedule, WarningType warningType,
                               int executorId, int warningGroupId,
@@ -506,7 +506,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         } else {
             command.setCommandType(commandType);
         }
-        command.setProcessDefinitionId(processDefineId);
+        command.setProcessDefinitionCode(processDefineCode);
         if (nodeDep != null) {
             command.setTaskDependType(nodeDep);
         }
@@ -549,7 +549,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     return processService.createCommand(command);
                 } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineId); // TODO: next pr change to code
+                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineCode);
                     List<Date> listDate = new LinkedList<>();
                     if (!CollectionUtils.isEmpty(schedules)) {
                         for (Schedule item : schedules) {
@@ -580,7 +580,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     }
                 }
             } else {
-                logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId);
+                logger.error("there is not valid schedule date for the process definition code:{}", processDefineCode);
             }
         } else {
             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index cba0151..bef466e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -14,15 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.entity;
 
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+
+import java.util.Date;
+
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
-import org.apache.dolphinscheduler.common.enums.*;
-
-import java.util.Date;
 
 /**
  * command
@@ -33,7 +39,7 @@ public class Command {
     /**
      * id
      */
-    @TableId(value="id", type=IdType.AUTO)
+    @TableId(value = "id", type = IdType.AUTO)
     private int id;
 
     /**
@@ -43,10 +49,10 @@ public class Command {
     private CommandType commandType;
 
     /**
-     * process definition id
+     * process definition code
      */
-    @TableField("process_definition_id")
-    private int processDefinitionId;
+    @TableField("process_definition_code")
+    private long processDefinitionCode;
 
     /**
      * executor id
@@ -126,7 +132,7 @@ public class Command {
             TaskDependType taskDependType,
             FailureStrategy failureStrategy,
             int executorId,
-            int processDefinitionId,
+            long processDefinitionCode,
             String commandParam,
             WarningType warningType,
             int warningGroupId,
@@ -135,7 +141,7 @@ public class Command {
             Priority processInstancePriority) {
         this.commandType = commandType;
         this.executorId = executorId;
-        this.processDefinitionId = processDefinitionId;
+        this.processDefinitionCode = processDefinitionCode;
         this.commandParam = commandParam;
         this.warningType = warningType;
         this.warningGroupId = warningGroupId;
@@ -148,7 +154,6 @@ public class Command {
         this.processInstancePriority = processInstancePriority;
     }
 
-
     public TaskDependType getTaskDependType() {
         return taskDependType;
     }
@@ -173,15 +178,14 @@ public class Command {
         this.commandType = commandType;
     }
 
-    public int getProcessDefinitionId() {
-        return processDefinitionId;
+    public long getProcessDefinitionCode() {
+        return processDefinitionCode;
     }
 
-    public void setProcessDefinitionId(int processDefinitionId) {
-        this.processDefinitionId = processDefinitionId;
+    public void setProcessDefinitionCode(long processDefinitionCode) {
+        this.processDefinitionCode = processDefinitionCode;
     }
 
-
     public FailureStrategy getFailureStrategy() {
         return failureStrategy;
     }
@@ -276,7 +280,7 @@ public class Command {
         if (id != command.id) {
             return false;
         }
-        if (processDefinitionId != command.processDefinitionId) {
+        if (processDefinitionCode != command.processDefinitionCode) {
             return false;
         }
         if (executorId != command.executorId) {
@@ -320,7 +324,7 @@ public class Command {
     public int hashCode() {
         int result = id;
         result = 31 * result + (commandType != null ? commandType.hashCode() : 0);
-        result = 31 * result + processDefinitionId;
+        result = 31 * result + Long.hashCode(processDefinitionCode);
         result = 31 * result + executorId;
         result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0);
         result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0);
@@ -334,24 +338,25 @@ public class Command {
         result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
         return result;
     }
+
     @Override
     public String toString() {
-        return "Command{" +
-                "id=" + id +
-                ", commandType=" + commandType +
-                ", processDefinitionId=" + processDefinitionId +
-                ", executorId=" + executorId +
-                ", commandParam='" + commandParam + '\'' +
-                ", taskDependType=" + taskDependType +
-                ", failureStrategy=" + failureStrategy +
-                ", warningType=" + warningType +
-                ", warningGroupId=" + warningGroupId +
-                ", scheduleTime=" + scheduleTime +
-                ", startTime=" + startTime +
-                ", processInstancePriority=" + processInstancePriority +
-                ", updateTime=" + updateTime +
-                ", workerGroup='" + workerGroup + '\'' +
-                '}';
+        return "Command{"
+                + "id=" + id
+                + ", commandType=" + commandType
+                + ", processDefinitionCode=" + processDefinitionCode
+                + ", executorId=" + executorId
+                + ", commandParam='" + commandParam + '\''
+                + ", taskDependType=" + taskDependType
+                + ", failureStrategy=" + failureStrategy
+                + ", warningType=" + warningType
+                + ", warningGroupId=" + warningGroupId
+                + ", scheduleTime=" + scheduleTime
+                + ", startTime=" + startTime
+                + ", processInstancePriority=" + processInstancePriority
+                + ", updateTime=" + updateTime
+                + ", workerGroup='" + workerGroup + '\''
+                + '}';
     }
 }
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
index 760bb23..c4a5e60 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
@@ -14,15 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.entity;
 
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+
+import java.util.Date;
+
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import com.fasterxml.jackson.annotation.JsonFormat;
-import org.apache.dolphinscheduler.common.enums.*;
-
-import java.util.Date;
 
 /**
  * command
@@ -33,7 +39,7 @@ public class ErrorCommand {
     /**
      * id
      */
-    @TableId(value="id", type = IdType.INPUT)
+    @TableId(value = "id", type = IdType.INPUT)
     private int id;
 
     /**
@@ -42,9 +48,9 @@ public class ErrorCommand {
     private CommandType commandType;
 
     /**
-     * process definition id
+     * process definition code
      */
-    private int processDefinitionId;
+    private long processDefinitionCode;
 
     /**
      * executor id
@@ -79,13 +85,13 @@ public class ErrorCommand {
     /**
      * schedule time
      */
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date scheduleTime;
 
     /**
      * start time
      */
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date startTime;
 
     /**
@@ -96,7 +102,7 @@ public class ErrorCommand {
     /**
      * update time
      */
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date updateTime;
 
     /**
@@ -111,11 +117,11 @@ public class ErrorCommand {
 
     public ErrorCommand(){}
 
-    public ErrorCommand(Command command, String message){
+    public ErrorCommand(Command command, String message) {
         this.id = command.getId();
         this.commandType = command.getCommandType();
         this.executorId = command.getExecutorId();
-        this.processDefinitionId = command.getProcessDefinitionId();
+        this.processDefinitionCode = command.getProcessDefinitionCode();
         this.commandParam = command.getCommandParam();
         this.warningType = command.getWarningType();
         this.warningGroupId = command.getWarningGroupId();
@@ -128,34 +134,6 @@ public class ErrorCommand {
         this.message = message;
     }
 
-    public ErrorCommand(
-            CommandType commandType,
-            TaskDependType taskDependType,
-            FailureStrategy failureStrategy,
-            int executorId,
-            int processDefinitionId,
-            String commandParam,
-            WarningType warningType,
-            int warningGroupId,
-            Date scheduleTime,
-            Priority processInstancePriority,
-            String message){
-        this.commandType = commandType;
-        this.executorId = executorId;
-        this.processDefinitionId = processDefinitionId;
-        this.commandParam = commandParam;
-        this.warningType = warningType;
-        this.warningGroupId = warningGroupId;
-        this.scheduleTime = scheduleTime;
-        this.taskDependType = taskDependType;
-        this.failureStrategy = failureStrategy;
-        this.startTime = new Date();
-        this.updateTime = new Date();
-        this.processInstancePriority = processInstancePriority;
-        this.message = message;
-    }
-
-
     public TaskDependType getTaskDependType() {
         return taskDependType;
     }
@@ -180,15 +158,14 @@ public class ErrorCommand {
         this.commandType = commandType;
     }
 
-    public int getProcessDefinitionId() {
-        return processDefinitionId;
+    public long getProcessDefinitionCode() {
+        return processDefinitionCode;
     }
 
-    public void setProcessDefinitionId(int processDefinitionId) {
-        this.processDefinitionId = processDefinitionId;
+    public void setProcessDefinitionCode(long processDefinitionCode) {
+        this.processDefinitionCode = processDefinitionCode;
     }
 
-
     public FailureStrategy getFailureStrategy() {
         return failureStrategy;
     }
@@ -279,22 +256,22 @@ public class ErrorCommand {
 
     @Override
     public String toString() {
-        return "ErrorCommand{" +
-                "id=" + id +
-                ", commandType=" + commandType +
-                ", processDefinitionId=" + processDefinitionId +
-                ", executorId=" + executorId +
-                ", commandParam='" + commandParam + '\'' +
-                ", taskDependType=" + taskDependType +
-                ", failureStrategy=" + failureStrategy +
-                ", warningType=" + warningType +
-                ", warningGroupId=" + warningGroupId +
-                ", scheduleTime=" + scheduleTime +
-                ", startTime=" + startTime +
-                ", processInstancePriority=" + processInstancePriority +
-                ", updateTime=" + updateTime +
-                ", message='" + message + '\'' +
-                ", workerGroup='" + workerGroup + '\'' +
-                '}';
+        return "ErrorCommand{"
+                + "id=" + id
+                + ", commandType=" + commandType
+                + ", processDefinitionCode=" + processDefinitionCode
+                + ", executorId=" + executorId
+                + ", commandParam='" + commandParam + '\''
+                + ", taskDependType=" + taskDependType
+                + ", failureStrategy=" + failureStrategy
+                + ", warningType=" + warningType
+                + ", warningGroupId=" + warningGroupId
+                + ", scheduleTime=" + scheduleTime
+                + ", startTime=" + startTime
+                + ", processInstancePriority=" + processInstancePriority
+                + ", updateTime=" + updateTime
+                + ", message='" + message + '\''
+                + ", workerGroup='" + workerGroup + '\''
+                + '}';
     }
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index c0728f2..38f5bff 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -19,11 +19,11 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.CommandMapper">
     <select id="getOneToRun" resultType="org.apache.dolphinscheduler.dao.entity.Command">
-        select cmd.id, cmd.command_type, cmd.process_definition_id, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
+        select cmd.id, cmd.command_type, cmd.process_definition_code, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
         cmd.warning_type, cmd.warning_group_id, cmd.schedule_time, cmd.start_time, cmd.executor_id, cmd.update_time,
         cmd.process_instance_priority, cmd.worker_group
         from t_ds_command cmd
-        join t_ds_process_definition definition on cmd.process_definition_id = definition.id
+        join t_ds_process_definition definition on cmd.process_definition_code = definition.code
         where definition.release_state = 1 AND definition.flag = 1
         order by cmd.update_time asc
         limit 1
@@ -31,7 +31,7 @@
     <select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
         select cmd.command_type as command_type, count(1) as count
         from t_ds_command cmd, t_ds_process_definition process
-        where cmd.process_definition_id = process.id
+        where cmd.process_definition_code = process.code
         <if test="projectCodeArray != null and projectCodeArray.length != 0">
             and process.project_code in
             <foreach collection="projectCodeArray" index="index" item="i" open="(" close=")" separator=",">
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml
index 5f93854..8179ff4 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml
@@ -21,7 +21,7 @@
     <select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
         select cmd.command_type as command_type, count(1) as count
         from t_ds_error_command cmd, t_ds_process_definition process
-        where cmd.process_definition_id = process.id
+        where cmd.process_definition_code = process.code
         <if test="projectCodeArray != null and projectCodeArray.length != 0">
             and process.project_code in
             <foreach collection="projectCodeArray" index="index" item="i" open="(" close=")" separator=",">
@@ -33,4 +33,4 @@
         </if>
         group by cmd.command_type
     </select>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index dd73bc3..dc9dbee 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -14,15 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.mapper;
 
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.CommandCount;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.common.enums.*;
-import org.junit.Assert;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -31,14 +50,6 @@ import org.springframework.test.annotation.Rollback;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
-
 /**
  *  command mapper test
  */
@@ -48,24 +59,21 @@ import static org.junit.Assert.*;
 @Rollback(true)
 public class CommandMapperTest {
 
-
     @Autowired
     CommandMapper commandMapper;
 
     @Autowired
     ProcessDefinitionMapper processDefinitionMapper;
 
-
     /**
      * test insert
      */
     @Test
-    public void testInsert(){
+    public void testInsert() {
         Command command = createCommand();
         assertThat(command.getId(),greaterThan(0));
     }
 
-
     /**
      * test select by id
      */
@@ -76,14 +84,14 @@ public class CommandMapperTest {
         Command actualCommand = commandMapper.selectById(expectedCommand.getId());
 
         assertNotNull(actualCommand);
-        assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId());
+        assertEquals(expectedCommand.getProcessDefinitionCode(), actualCommand.getProcessDefinitionCode());
     }
 
     /**
      * test update
      */
     @Test
-    public void testUpdate(){
+    public void testUpdate() {
 
         Command expectedCommand = createCommand();
 
@@ -103,7 +111,7 @@ public class CommandMapperTest {
      * test delete
      */
     @Test
-    public void testDelete(){
+    public void testDelete() {
         Command expectedCommand = createCommand();
 
         commandMapper.deleteById(expectedCommand.getId());
@@ -124,7 +132,6 @@ public class CommandMapperTest {
 
         Map<Integer, Command> commandMap = createCommandMap(count);
 
-
         List<Command> actualCommands = commandMapper.selectList(null);
 
         assertThat(actualCommands.size(), greaterThanOrEqualTo(count));
@@ -138,7 +145,7 @@ public class CommandMapperTest {
 
         ProcessDefinition processDefinition = createProcessDefinition();
 
-        Command expectedCommand = createCommand(CommandType.START_PROCESS,processDefinition.getId());
+        createCommand(CommandType.START_PROCESS, processDefinition.getCode());
 
         Command actualCommand = commandMapper.getOneToRun();
 
@@ -154,7 +161,7 @@ public class CommandMapperTest {
 
         ProcessDefinition processDefinition = createProcessDefinition();
 
-        CommandCount expectedCommandCount = createCommandMap(count, CommandType.START_PROCESS, processDefinition.getId());
+        createCommandMap(count, CommandType.START_PROCESS, processDefinition.getCode());
 
         Long[] projectCodeArray = {processDefinition.getProjectCode()};
 
@@ -167,23 +174,22 @@ public class CommandMapperTest {
         assertThat(actualCommandCounts.size(),greaterThanOrEqualTo(1));
     }
 
-
     /**
      * create command map
      * @param count map count
      * @param commandType comman type
-     * @param processDefinitionId process definition id
+     * @param processDefinitionCode process definition code
      * @return command map
      */
     private CommandCount createCommandMap(
             Integer count,
             CommandType commandType,
-            Integer processDefinitionId){
+            long processDefinitionCode) {
 
         CommandCount commandCount = new CommandCount();
 
-        for (int i = 0 ;i < count ;i++){
-            createCommand(commandType,processDefinitionId);
+        for (int i = 0;i < count;i++) {
+            createCommand(commandType, processDefinitionCode);
         }
         commandCount.setCommandType(commandType);
         commandCount.setCount(count);
@@ -195,7 +201,7 @@ public class CommandMapperTest {
      *  create process definition
      * @return process definition
      */
-    private ProcessDefinition createProcessDefinition(){
+    private ProcessDefinition createProcessDefinition() {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setCode(1L);
         processDefinition.setReleaseState(ReleaseState.ONLINE);
@@ -215,22 +221,21 @@ public class CommandMapperTest {
      * @param count map count
      * @return command map
      */
-    private Map<Integer,Command> createCommandMap(Integer count){
+    private Map<Integer,Command> createCommandMap(Integer count) {
         Map<Integer,Command> commandMap = new HashMap<>();
 
-        for (int i = 0; i < count ;i++){
+        for (int i = 0;i < count;i++) {
             Command command = createCommand();
             commandMap.put(command.getId(),command);
         }
         return commandMap;
     }
 
-
     /**
      * create command
      * @return
      */
-    private Command createCommand(){
+    private Command createCommand() {
         return createCommand(CommandType.START_PROCESS,1);
     }
 
@@ -238,11 +243,11 @@ public class CommandMapperTest {
      * create command
      * @return Command
      */
-    private Command createCommand(CommandType commandType,Integer processDefinitionId){
+    private Command createCommand(CommandType commandType, long processDefinitionCode) {
 
         Command command = new Command();
         command.setCommandType(commandType);
-        command.setProcessDefinitionId(processDefinitionId);
+        command.setProcessDefinitionCode(processDefinitionCode);
         command.setExecutorId(4);
         command.setCommandParam("test command param");
         command.setTaskDependType(TaskDependType.TASK_ONLY);
@@ -259,6 +264,4 @@ public class CommandMapperTest {
         return command;
     }
 
-
-
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
index 6e18aa9..58bd93a 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
@@ -21,6 +21,10 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.dao.entity.CommandCount;
 import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+
+import java.util.Date;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -30,9 +34,6 @@ import org.springframework.test.annotation.Rollback;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.util.Date;
-import java.util.List;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest
 @Transactional
@@ -81,7 +82,7 @@ public class ErrorCommandMapperTest {
         processDefinition.setCreateTime(new Date());
         processDefinitionMapper.insert(processDefinition);
 
-        errorCommand.setProcessDefinitionId(processDefinition.getId());
+        errorCommand.setProcessDefinitionCode(processDefinition.getCode());
         errorCommandMapper.updateById(errorCommand);
 
 
@@ -103,4 +104,4 @@ public class ErrorCommandMapperTest {
         Assert.assertNotEquals(commandCounts.size(), 0);
         Assert.assertNotEquals(commandCounts2.size(), 0);
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
index d402afc..2093d95 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
@@ -18,26 +18,16 @@ package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
-import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.graph.DAG;
-import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.common.process.ProcessDag;
 import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
+
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
 /**
  *  master test
  */
@@ -56,7 +46,7 @@ public class MasterCommandTest {
         Command cmd = new Command();
         cmd.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
         cmd.setCommandParam("{\"ProcessInstanceId\":325}");
-        cmd.setProcessDefinitionId(63);
+        cmd.setProcessDefinitionCode(63);
 
         commandMapper.insert(cmd);
 
@@ -66,7 +56,7 @@ public class MasterCommandTest {
     public void RecoverSuspendCommand(){
 
         Command cmd = new Command();
-        cmd.setProcessDefinitionId(44);
+        cmd.setProcessDefinitionCode(44);
         cmd.setCommandParam("{\"ProcessInstanceId\":290}");
         cmd.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
 
@@ -80,7 +70,7 @@ public class MasterCommandTest {
     public void startNewProcessCommand(){
         Command cmd = new Command();
         cmd.setCommandType(CommandType.START_PROCESS);
-        cmd.setProcessDefinitionId(167);
+        cmd.setProcessDefinitionCode(167);
         cmd.setFailureStrategy(FailureStrategy.CONTINUE);
         cmd.setWarningType(WarningType.NONE);
         cmd.setWarningGroupId(4);
@@ -94,7 +84,7 @@ public class MasterCommandTest {
         Command cmd = new Command();
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
         cmd.setCommandParam("{\"ProcessInstanceId\":816}");
-        cmd.setProcessDefinitionId(15);
+        cmd.setProcessDefinitionCode(15);
 
         commandMapper.insert(cmd);
     }
@@ -105,7 +95,7 @@ public class MasterCommandTest {
         cmd.setCommandType(CommandType.START_PROCESS);
         cmd.setFailureStrategy(FailureStrategy.CONTINUE);
         cmd.setWarningType(WarningType.ALL);
-        cmd.setProcessDefinitionId(72);
+        cmd.setProcessDefinitionCode(72);
         cmd.setExecutorId(10);
         commandMapper.insert(cmd);
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 01e4678..bd5e4df 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -270,7 +270,7 @@ public class ProcessService {
      * @return if thread is enough
      */
     private boolean checkThreadNum(Command command, int validThreadNum) {
-        int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
+        int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionCode());
         return validThreadNum >= commandThreadCount;
     }
 
@@ -469,12 +469,14 @@ public class ProcessService {
     /**
      * calculate sub process number in the process define.
      *
-     * @param processDefinitionId processDefinitionId
+     * @param processDefinitionCode processDefinitionCode
      * @return process thread num count
      */
-    private Integer workProcessThreadNumCount(Integer processDefinitionId) {
+    private Integer workProcessThreadNumCount(long processDefinitionCode) {
+        ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
+
         List<Integer> ids = new ArrayList<>();
-        recurseFindSubProcessId(processDefinitionId, ids);
+        recurseFindSubProcessId(processDefinition.getId(), ids);
         return ids.size() + 1;
     }
 
@@ -497,7 +499,6 @@ public class ProcessService {
                     ids.add(subProcessParam.getProcessDefinitionId());
                     recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids);
                 }
-
             }
         }
     }
@@ -529,7 +530,7 @@ public class ProcessService {
                     processInstance.getTaskDependType(),
                     processInstance.getFailureStrategy(),
                     processInstance.getExecutorId(),
-                    processInstance.getProcessDefinition().getId(),
+                    processInstance.getProcessDefinition().getCode(),
                     JSONUtils.toJsonString(cmdParam),
                     processInstance.getWarningType(),
                     processInstance.getWarningGroupId(),
@@ -713,13 +714,10 @@ public class ProcessService {
         CommandType commandType = command.getCommandType();
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
 
-        ProcessDefinition processDefinition = null;
-        if (command.getProcessDefinitionId() != 0) {
-            processDefinition = processDefineMapper.selectById(command.getProcessDefinitionId());
-            if (processDefinition == null) {
-                logger.error("cannot find the work process define! define id : {}", command.getProcessDefinitionId());
-                return null;
-            }
+        ProcessDefinition processDefinition = getProcessDefinitionByCommand(command.getProcessDefinitionCode(), cmdParam);
+        if (processDefinition == null) {
+            logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
+            return null;
         }
 
         if (cmdParam != null) {
@@ -741,6 +739,7 @@ public class ProcessService {
                 String pId = cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
                 processInstanceId = Integer.parseInt(pId);
             }
+
             if (processInstanceId == 0) {
                 processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
             } else {
@@ -869,6 +868,40 @@ public class ProcessService {
     }
 
     /**
+     * get process definition by command
+     * If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance
+     * Otherwise, get the latest version of ProcessDefinition
+     *
+     * @param processDefinitionCode
+     * @param cmdParam
+     * @return ProcessDefinition
+     */
+    private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
+        if (cmdParam != null) {
+            int processInstanceId = 0;
+            if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
+                processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING));
+            } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
+                processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS));
+            } else if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
+                processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD));
+            }
+
+            if (processInstanceId != 0) {
+                ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId);
+                if (processInstance == null) {
+                    return null;
+                }
+
+                return processDefineLogMapper.queryByDefinitionCodeAndVersion(
+                        processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
+            }
+        }
+
+        return processDefineMapper.queryByCode(processDefinitionCode);
+    }
+
+    /**
      * return complement data if the process start with complement data
      *
      * @param processInstance processInstance
@@ -1103,7 +1136,7 @@ public class ProcessService {
             childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
         }
         Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
-        updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId());
+        updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
         initSubInstanceState(childInstance);
         createCommand(subProcessCommand);
         logger.info("sub process command created: {} ", subProcessCommand);
@@ -1152,6 +1185,7 @@ public class ProcessService {
         CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
         Map<String, String> subProcessParam = JSONUtils.toMap(task.getTaskParams());
         int childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
+        ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(childDefineId);
 
         Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
         List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
@@ -1169,7 +1203,7 @@ public class ProcessService {
                 TaskDependType.TASK_POST,
                 parentProcessInstance.getFailureStrategy(),
                 parentProcessInstance.getExecutorId(),
-                childDefineId,
+                processDefinition.getCode(),
                 processParam,
                 parentProcessInstance.getWarningType(),
                 parentProcessInstance.getWarningGroupId(),
@@ -1208,12 +1242,12 @@ public class ProcessService {
      * update sub process definition
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param childDefinitionId childDefinitionId
+     * @param childDefinitionCode childDefinitionId
      */
-    private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
+    private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
         ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
                 parentProcessInstance.getProcessDefinitionVersion());
-        ProcessDefinition childDefinition = this.findProcessDefineById(childDefinitionId);
+        ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
         if (childDefinition != null && fatherDefinition != null) {
             childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
             processDefineMapper.updateById(childDefinition);
@@ -1693,7 +1727,7 @@ public class ProcessService {
 
         //2 insert into recover command
         Command cmd = new Command();
-        cmd.setProcessDefinitionId(processDefinition.getId());
+        cmd.setProcessDefinitionCode(processDefinition.getCode());
         cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
         cmd.setExecutorId(processInstance.getExecutorId());
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index bda8ad8..eacd8bc 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -93,7 +93,7 @@ public class ProcessScheduleJob implements Job {
         command.setCommandType(CommandType.SCHEDULER);
         command.setExecutorId(schedule.getUserId());
         command.setFailureStrategy(schedule.getFailureStrategy());
-        //command.setProcessDefinitionId(schedule.getProcessDefinitionCode()); TODO next pr
+        command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());
         command.setScheduleTime(scheduledFireTime);
         command.setStartTime(fireTime);
         command.setWarningGroupId(schedule.getWarningGroupId());
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 9eeec78..413a9d8 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -126,6 +126,9 @@ public class ProcessServiceTest {
         //father history: start; child null == command type: start
         parentInstance.setHistoryCmd("START_PROCESS");
         parentInstance.setCommandType(CommandType.START_PROCESS);
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setCode(1L);
+        Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
         command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
         Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
 
@@ -227,16 +230,15 @@ public class ProcessServiceTest {
         String host = "127.0.0.1";
         int validThreadNum = 1;
         Command command = new Command();
-        command.setProcessDefinitionId(222);
+        command.setProcessDefinitionCode(222);
         command.setCommandType(CommandType.REPEAT_RUNNING);
         command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
                 + CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
-        Mockito.when(processDefineMapper.selectById(command.getProcessDefinitionId())).thenReturn(null);
         Assert.assertNull(processService.handleCommand(logger, host, validThreadNum, command));
 
         //there is not enough thread for this command
         Command command1 = new Command();
-        command1.setProcessDefinitionId(123);
+        command1.setProcessDefinitionCode(123);
         command1.setCommandParam("{\"ProcessInstanceId\":222}");
         command1.setCommandType(CommandType.START_PROCESS);
         ProcessDefinition processDefinition = new ProcessDefinition();
@@ -254,31 +256,35 @@ public class ProcessServiceTest {
         processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
         ProcessInstance processInstance = new ProcessInstance();
         processInstance.setId(222);
-        Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition);
+        processInstance.setProcessDefinitionCode(11L);
+        processInstance.setProcessDefinitionVersion(1);
+        Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
+        Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
         Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
         Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command1));
 
         Command command2 = new Command();
         command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
-        command2.setProcessDefinitionId(123);
+        command2.setProcessDefinitionCode(123);
         command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
 
         Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command2));
 
         Command command3 = new Command();
-        command3.setProcessDefinitionId(123);
+        command3.setProcessDefinitionCode(123);
         command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
         command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
         Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command3));
 
         Command command4 = new Command();
-        command4.setProcessDefinitionId(123);
+        command4.setProcessDefinitionCode(123);
         command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
         command4.setCommandType(CommandType.REPEAT_RUNNING);
         Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4));
 
         Command command5 = new Command();
-        command5.setProcessDefinitionId(123);
+        command5.setProcessDefinitionCode(123);
         HashMap<String, String> startParams = new HashMap<>();
         startParams.put("startParam1", "testStartParam1");
         HashMap<String, String> commandParams = new HashMap<>();
@@ -317,20 +323,36 @@ public class ProcessServiceTest {
 
     @Test
     public void testRecurseFindSubProcessId() {
+        int parentProcessDefineId = 1;
+        long parentProcessDefineCode = 1L;
+        int parentProcessDefineVersion = 1;
+
         ProcessDefinition processDefinition = new ProcessDefinition();
-        processDefinition.setCode(10L);
-        int parentId = 111;
-        List<Integer> ids = new ArrayList<>();
-        ProcessDefinition processDefinition2 = new ProcessDefinition();
-        processDefinition2.setCode(11L);
-        Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
+        processDefinition.setCode(parentProcessDefineCode);
+        processDefinition.setVersion(parentProcessDefineVersion);
+        Mockito.when(processDefineMapper.selectById(parentProcessDefineId)).thenReturn(processDefinition);
+
+        long postTaskCode = 2L;
+        int postTaskVersion = 2;
+
         List<ProcessTaskRelationLog> relationLogList = new ArrayList<>();
-        Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong()
-                , Mockito.anyInt()))
-                .thenReturn(relationLogList);
+        ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
+        processTaskRelationLog.setPostTaskCode(postTaskCode);
+        processTaskRelationLog.setPostTaskVersion(postTaskVersion);
+        relationLogList.add(processTaskRelationLog);
+        Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode
+                , parentProcessDefineVersion)).thenReturn(relationLogList);
 
-        processService.recurseFindSubProcessId(parentId, ids);
+        List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+        TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog();
+        taskDefinitionLog1.setTaskParams("{\"processDefinitionId\": 123}");
+        taskDefinitionLogs.add(taskDefinitionLog1);
+        Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(Mockito.anySet())).thenReturn(taskDefinitionLogs);
+
+        List<Integer> ids = new ArrayList<>();
+        processService.recurseFindSubProcessId(parentProcessDefineId, ids);
 
+        Assert.assertEquals(1, ids.size());
     }
 
     @Test
@@ -499,7 +521,7 @@ public class ProcessServiceTest {
     @Test
     public void testCreateCommand() {
         Command command = new Command();
-        command.setProcessDefinitionId(123);
+        command.setProcessDefinitionCode(123);
         command.setCommandParam("{\"ProcessInstanceId\":222}");
         command.setCommandType(CommandType.START_PROCESS);
         int mockResult = 1;
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 11bdc8d..d056b23 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -319,7 +319,7 @@ DROP TABLE IF EXISTS `t_ds_command`;
 CREATE TABLE `t_ds_command` (
   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
   `command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start workflow, 1 start execution from current node, 2 resume fault-tolerant workflow, 3 resume pause process, 4 start execution from failed node, 5 complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
-  `process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
+  `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code',
   `command_param` text COMMENT 'json command parameters',
   `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0 current node, 1 forward, 2 backward',
   `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1 continue',
@@ -367,7 +367,7 @@ CREATE TABLE `t_ds_error_command` (
   `id` int(11) NOT NULL COMMENT 'key',
   `command_type` tinyint(4) DEFAULT NULL COMMENT 'command type',
   `executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
-  `process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
+  `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code',
   `command_param` text COMMENT 'json command parameters',
   `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'task depend type',
   `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'failure strategy',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index eb97912..f396781 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -220,7 +220,7 @@ DROP TABLE IF EXISTS t_ds_command;
 CREATE TABLE t_ds_command (
   id int NOT NULL  ,
   command_type int DEFAULT NULL ,
-  process_definition_id int DEFAULT NULL ,
+  process_definition_code bigint NOT NULL ,
   command_param text ,
   task_depend_type int DEFAULT NULL ,
   failure_strategy int DEFAULT '0' ,
@@ -262,7 +262,7 @@ CREATE TABLE t_ds_error_command (
   id int NOT NULL ,
   command_type int DEFAULT NULL ,
   executor_id int DEFAULT NULL ,
-  process_definition_id int DEFAULT NULL ,
+  process_definition_code bigint NOT NULL ,
   command_param text ,
   task_depend_type int DEFAULT NULL ,
   failure_strategy int DEFAULT '0' ,