You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/08/02 02:32:41 UTC

[dolphinscheduler] branch json_split_two updated: [Fix-5778]: The t_ds_schedules table, process_definition_id change to process_definition_code (#5928)

This is an automated email from the ASF dual-hosted git repository.

wenhemin pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split_two by this push:
     new d1a61f1  [Fix-5778]: The t_ds_schedules table, process_definition_id change to process_definition_code (#5928)
d1a61f1 is described below

commit d1a61f16934c0c9423a507cc129d55ff5aace84a
Author: wen-hemin <39...@users.noreply.github.com>
AuthorDate: Mon Aug 2 10:32:32 2021 +0800

    [Fix-5778]: The t_ds_schedules table, process_definition_id change to process_definition_code (#5928)
    
    * fix: the t_ds_schedules table, process_definition_id change to process_definition_code
    
    * fix checkstyle
    
    * fix: recovery code
    
    * fix UT
    
    Co-authored-by: wen-hemin <we...@apache.com>
---
 .../api/service/impl/ExecutorServiceImpl.java      |   2 +-
 .../service/impl/ProcessDefinitionServiceImpl.java |   8 +-
 .../api/service/impl/SchedulerServiceImpl.java     |  14 +--
 .../api/service/ExecutorServiceTest.java           |  12 +--
 .../api/service/ProcessDefinitionServiceTest.java  |  18 +---
 .../api/service/SchedulerServiceTest.java          |  12 ++-
 .../dolphinscheduler/dao/entity/Schedule.java      |  15 +--
 .../dao/mapper/ScheduleMapper.java                 |  32 +++---
 .../dao/mapper/ProcessDefinitionMapper.xml         |   6 +-
 .../dolphinscheduler/dao/mapper/ScheduleMapper.xml |  32 +++---
 .../dao/mapper/WorkFlowLineageMapper.xml           |   4 +-
 .../dao/mapper/ScheduleMapperTest.java             |  44 ++++----
 .../dao/mapper/WorkFlowLineageMapperTest.java      |   7 +-
 .../server/master/runner/MasterExecThread.java     |   4 +-
 .../server/master/MasterExecThreadTest.java        |  13 +--
 .../service/process/ProcessService.java            | 120 ++++-----------------
 .../service/quartz/ProcessScheduleJob.java         |   6 +-
 sql/dolphinscheduler_mysql.sql                     |   2 +-
 sql/dolphinscheduler_postgre.sql                   |   2 +-
 19 files changed, 134 insertions(+), 219 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 1d944d0..8978257 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -549,7 +549,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     return processService.createCommand(command);
                 } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
+                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineId); // TODO: next pr change to code
                     List<Date> listDate = new LinkedList<>();
                     if (!CollectionUtils.isEmpty(schedules)) {
                         for (Schedule item : schedules) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index b28e899..65e2b10 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -555,7 +555,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
 
         // get the timing according to the process definition
-        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
+        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionId);
         if (!schedules.isEmpty() && schedules.size() > 1) {
             logger.warn("scheduler num is {},Greater than 1", schedules.size());
             putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
@@ -630,7 +630,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 processDefinition.setReleaseState(releaseState);
                 processDefinitionMapper.updateById(processDefinition);
                 List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray(
-                        new int[]{processDefinition.getId()}
+                        new long[]{processDefinition.getCode()}
                 );
 
                 for (Schedule schedule : scheduleList) {
@@ -712,7 +712,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return DagDataSchedule
      */
     public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) {
-        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinition.getId());
+        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
         DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition));
         if (!schedules.isEmpty()) {
             Schedule schedule = schedules.get(0);
@@ -821,7 +821,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         Schedule schedule = dagDataSchedule.getSchedule();
         if (null != schedule) {
             ProcessDefinition newProcessDefinition = processDefinitionMapper.queryByCode(processDefinition.getCode());
-            schedule.setProcessDefinitionId(newProcessDefinition.getId());
+            schedule.setProcessDefinitionCode(newProcessDefinition.getCode());
             schedule.setUserId(loginUser.getId());
             schedule.setCreateTime(now);
             schedule.setUpdateTime(now);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 350ab9a..edc7074 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -140,7 +140,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
         Date now = new Date();
 
         scheduleObj.setProjectName(project.getName());
-        scheduleObj.setProcessDefinitionId(processDefinition.getId());
+        scheduleObj.setProcessDefinitionCode(processDefineCode);
         scheduleObj.setProcessDefinitionName(processDefinition.getName());
 
         ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
@@ -228,9 +228,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
             return result;
         }
 
-        ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
         if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId());
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode());
             return result;
         }
 
@@ -326,9 +326,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
             putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
             return result;
         }
-        ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId());
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
         if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId());
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
             return result;
         }
 
@@ -342,7 +342,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
             }
             // check sub process definition release state
             List<Integer> subProcessDefineIds = new ArrayList<>();
-            processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
+            processService.recurseFindSubProcessId(processDefinition.getId(), subProcessDefineIds);
             Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
             if (!subProcessDefineIds.isEmpty()) {
                 List<ProcessDefinition> subProcessDefinitionList =
@@ -430,7 +430,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
         }
 
         Page<Schedule> page = new Page<>(pageNo, pageSize);
-        IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(page, processDefinition.getId(),
+        IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode,
             searchVal);
 
         PageInfo<Schedule> pageInfo = new PageInfo<>(pageNo, pageSize);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 624fa6e..2cae051 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -152,7 +152,7 @@ public class ExecutorServiceTest {
     @Test
     public void testNoComplement() {
 
-        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.START_PROCESS,
                 null, null,
@@ -170,7 +170,7 @@ public class ExecutorServiceTest {
     @Test
     public void testComplementWithStartNodeList() {
 
-        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.START_PROCESS,
                 null, "n1,n2",
@@ -188,7 +188,7 @@ public class ExecutorServiceTest {
     @Test
     public void testDateError() {
 
-        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
                 null, null,
@@ -205,7 +205,7 @@ public class ExecutorServiceTest {
     @Test
     public void testSerial() {
 
-        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
@@ -222,7 +222,7 @@ public class ExecutorServiceTest {
     @Test
     public void testParallelWithOutSchedule() {
 
-        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
@@ -240,7 +240,7 @@ public class ExecutorServiceTest {
     @Test
     public void testParallelWithSchedule() {
 
-        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index c25fa59..f6a6a7f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.api.service;
 
 import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
 
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
@@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.DagData;
@@ -55,10 +52,6 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import org.apache.http.entity.ContentType;
-
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -70,7 +63,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletResponse;
 
 import org.junit.Assert;
@@ -80,8 +72,6 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.springframework.mock.web.MockMultipartFile;
-import org.springframework.web.multipart.MultipartFile;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -516,7 +506,7 @@ public class ProcessDefinitionServiceTest {
         List<Schedule> schedules = new ArrayList<>();
         schedules.add(getSchedule());
         schedules.add(getSchedule());
-        Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
         Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
         Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS));
 
@@ -527,7 +517,7 @@ public class ProcessDefinitionServiceTest {
         schedules.add(schedule);
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
-        Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
         Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
         Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
 
@@ -537,7 +527,7 @@ public class ProcessDefinitionServiceTest {
         schedules.add(schedule);
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
-        Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
         Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
         Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
         Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
@@ -950,7 +940,7 @@ public class ProcessDefinitionServiceTest {
         Date date = new Date();
         Schedule schedule = new Schedule();
         schedule.setId(46);
-        schedule.setProcessDefinitionId(1);
+        schedule.setProcessDefinitionCode(1);
         schedule.setStartTime(date);
         schedule.setEndTime(date);
         schedule.setCrontab("0 0 5 * * ? *");
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index 7cd383a..238d1d0 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -71,6 +72,9 @@ public class SchedulerServiceTest {
     private ProjectMapper projectMapper;
 
     @Mock
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Mock
     private ProjectServiceImpl projectService;
 
     @Mock
@@ -102,7 +106,7 @@ public class SchedulerServiceTest {
 
         Schedule schedule = new Schedule();
         schedule.setId(1);
-        schedule.setProcessDefinitionId(1);
+        schedule.setProcessDefinitionCode(1);
         schedule.setReleaseState(ReleaseState.OFFLINE);
 
         List<Server> masterServers = new ArrayList<>();
@@ -113,7 +117,7 @@ public class SchedulerServiceTest {
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
 
-        Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
 
         //hash no auth
         result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
@@ -128,10 +132,10 @@ public class SchedulerServiceTest {
         Assert.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
 
         //PROCESS_DEFINE_NOT_EXIST
-        schedule.setProcessDefinitionId(2);
+        schedule.setProcessDefinitionCode(2);
         result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
-        schedule.setProcessDefinitionId(1);
+        schedule.setProcessDefinitionCode(1);
 
         // PROCESS_DEFINE_NOT_RELEASE
         result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java
index 74ed5c1..e1b4c90 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java
@@ -39,10 +39,11 @@ public class Schedule {
 
     @TableId(value = "id", type = IdType.AUTO)
     private int id;
+
     /**
-     * process definition id
+     * process definition code
      */
-    private int processDefinitionId;
+    private long processDefinitionCode;
 
     /**
      * process definition name
@@ -222,12 +223,12 @@ public class Schedule {
         this.releaseState = releaseState;
     }
 
-    public int getProcessDefinitionId() {
-        return processDefinitionId;
+    public long getProcessDefinitionCode() {
+        return processDefinitionCode;
     }
 
-    public void setProcessDefinitionId(int processDefinitionId) {
-        this.processDefinitionId = processDefinitionId;
+    public void setProcessDefinitionCode(long processDefinitionCode) {
+        this.processDefinitionCode = processDefinitionCode;
     }
 
     public String getProcessDefinitionName() {
@@ -290,7 +291,7 @@ public class Schedule {
     public String toString() {
         return "Schedule{"
                 + "id=" + id
-                + ", processDefinitionId=" + processDefinitionId
+                + ", processDefinitionCode=" + processDefinitionCode
                 + ", processDefinitionName='" + processDefinitionName + '\''
                 + ", projectName='" + projectName + '\''
                 + ", description='" + definitionDescription + '\''
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index 225677d..37fae5d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -17,12 +17,14 @@
 package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.Schedule;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
+
 import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+
 /**
  * scheduler mapper interface
  */
@@ -31,13 +33,13 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
     /**
      * scheduler page
      * @param page page
-     * @param processDefinitionId processDefinitionId
+     * @param processDefinitionCode processDefinitionCode
      * @param searchVal searchVal
      * @return scheduler IPage
      */
-    IPage<Schedule> queryByProcessDefineIdPaging(IPage<Schedule> page,
-                                                 @Param("processDefinitionId") int processDefinitionId,
-                                                 @Param("searchVal") String searchVal);
+    IPage<Schedule> queryByProcessDefineCodePaging(IPage<Schedule> page,
+                                                   @Param("processDefinitionCode") long processDefinitionCode,
+                                                   @Param("searchVal") String searchVal);
 
     /**
      * query schedule list by project name
@@ -47,24 +49,24 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
     List<Schedule> querySchedulerListByProjectName(@Param("projectName") String projectName);
 
     /**
-     * query schedule list by process definition ids
-     * @param processDefineIds processDefineIds
+     * query schedule list by process definition codes
+     * @param processDefineCodes processDefineCodes
      * @return schedule list
      */
-    List<Schedule> selectAllByProcessDefineArray(@Param("processDefineIds") int[] processDefineIds);
+    List<Schedule> selectAllByProcessDefineArray(@Param("processDefineCodes") long[] processDefineCodes);
 
     /**
-     * query schedule list by process definition id
-     * @param processDefinitionId processDefinitionId
+     * query schedule list by process definition code
+     * @param processDefinitionCode processDefinitionCode
      * @return schedule list
      */
-    List<Schedule> queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
+    List<Schedule> queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
 
     /**
-     * query schedule list by process definition id
-     * @param processDefinitionId processDefinitionId
+     * query schedule list by process definition code
+     * @param processDefinitionCode processDefinitionCode
      * @return schedule list
      */
-    List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
+    List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
 
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index 0ede680..590e447 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -72,8 +72,8 @@
         td.global_params, td.flag, td.warning_group_id, td.timeout, td.tenant_id, td.update_time, td.create_time,
         sc.schedule_release_state, tu.user_name
         FROM t_ds_process_definition td
-        left join (select process_definition_id,release_state as schedule_release_state from t_ds_schedules group by
-        process_definition_id,release_state) sc on sc.process_definition_id = td.id
+        left join (select process_definition_code,release_state as schedule_release_state from t_ds_schedules group by
+        process_definition_code,release_state) sc on sc.process_definition_code = td.code
         left join t_ds_user tu on td.user_id = tu.id
         where td.project_code = #{projectCode}
         <if test=" searchVal != null and searchVal != ''">
@@ -120,7 +120,7 @@
         </if>
         group by td.user_id,tu.user_name
     </select>
-    
+
     <select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
         SELECT
             pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description,
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
index 587680f..482d30b 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
@@ -19,32 +19,34 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ScheduleMapper">
     <sql id="baseSql">
-        id, process_definition_id, start_time, end_time, timezone_id, crontab, failure_strategy, user_id, release_state,
+        id, process_definition_code, start_time, end_time, timezone_id, crontab, failure_strategy, user_id, release_state,
         warning_type, warning_group_id, process_instance_priority, worker_group, create_time, update_time
     </sql>
     <sql id="baseSqlV2">
-        ${alias}.id, ${alias}.process_definition_id, ${alias}.start_time, ${alias}.end_time, ${alias}.timezone_id, ${alias}.crontab, ${alias}.failure_strategy, ${alias}.user_id, ${alias}.release_state,
-        ${alias}.warning_type, ${alias}.warning_group_id, ${alias}.process_instance_priority, ${alias}.worker_group, ${alias}.create_time, ${alias}.update_time
+        ${alias}.id, ${alias}.process_definition_code, ${alias}.start_time, ${alias}.end_time, ${alias}.timezone_id,
+        ${alias}.crontab, ${alias}.failure_strategy, ${alias}.user_id, ${alias}.release_state, ${alias}.warning_type,
+        ${alias}.warning_group_id, ${alias}.process_instance_priority, ${alias}.worker_group, ${alias}.create_time,
+        ${alias}.update_time
     </sql>
-    <select id="queryByProcessDefineIdPaging" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
+    <select id="queryByProcessDefineCodePaging" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
         select p_f.name as process_definition_name, p.name as project_name,u.user_name,
         <include refid="baseSqlV2">
             <property name="alias" value="s"/>
         </include>
         from t_ds_schedules s
-        join t_ds_process_definition p_f on s.process_definition_id = p_f.id
+        join t_ds_process_definition p_f on s.process_definition_code = p_f.code
         join t_ds_project as p on p_f.project_code = p.code
         join t_ds_user as u on s.user_id = u.id
         where 1=1
-        <if test="processDefinitionId!= 0">
-            and s.process_definition_id = #{processDefinitionId}
+        <if test="processDefinitionCode != 0">
+            and s.process_definition_code = #{processDefinitionCode}
         </if>
         order by s.update_time desc
     </select>
     <select id="querySchedulerListByProjectName" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
         select p_f.name as process_definition_name, p_f.description as definition_description, p.name as project_name,u.user_name,s.*
         from t_ds_schedules s
-        join t_ds_process_definition p_f on s.process_definition_id = p_f.id
+        join t_ds_process_definition p_f on s.process_definition_code = p_f.code
         join t_ds_project as p on p_f.project_code = p.code
         join t_ds_user as u on s.user_id = u.id
         where p.name = #{projectName}
@@ -54,25 +56,25 @@
         <include refid="baseSql"/>
         from t_ds_schedules
         where 1= 1
-        <if test="processDefineIds != null and processDefineIds.length != 0 ">
-            and process_definition_id in
-            <foreach collection="processDefineIds" index="index" item="i" open="(" separator="," close=")">
+        <if test="processDefineCodes != null and processDefineCodes.length != 0 ">
+            and process_definition_code in
+            <foreach collection="processDefineCodes" index="index" item="i" open="(" separator="," close=")">
                 #{i}
             </foreach>
         </if>
         and release_state = 1
     </select>
-    <select id="queryByProcessDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
+    <select id="queryByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
         select
         <include refid="baseSql"/>
         from t_ds_schedules
-        where process_definition_id =#{processDefinitionId}
+        where process_definition_code = #{processDefinitionCode}
     </select>
-    <select id="queryReleaseSchedulerListByProcessDefinitionId"
+    <select id="queryReleaseSchedulerListByProcessDefinitionCode"
             resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
         select
         <include refid="baseSql"/>
         from t_ds_schedules
-        where process_definition_id =#{processDefinitionId} and release_state = 1
+        where process_definition_code = #{processDefinitionCode} and release_state = 1
     </select>
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index b78113b..eeddaf5 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -22,7 +22,7 @@
     <select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
         select tepd.id as work_flow_id,tepd.name as work_flow_name
         from t_ds_process_definition tepd
-        left join t_ds_schedules tes on tepd.id = tes.process_definition_id
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
         where tepd.project_code = #{projectCode}
         <if test="searchVal != null and searchVal != ''">
             and tepd.name like concat('%', #{searchVal}, '%')
@@ -74,7 +74,7 @@
                 tes.crontab as crontab,
                 tes.release_state as schedule_publish_status
         from t_ds_process_definition tepd
-        left join t_ds_schedules tes on tepd.id = tes.process_definition_id
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
         where tepd.project_code = #{projectCode} and tepd.code = #{processDefinitionCode}
     </select>
 
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
index e55aaca..752eca4 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.dao.mapper;
 
+package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
@@ -24,8 +24,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.User;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
+import java.util.Date;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,8 +37,8 @@ import org.springframework.test.annotation.Rollback;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.util.Date;
-import java.util.List;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest
@@ -61,7 +63,7 @@ public class ScheduleMapperTest {
      * insert
      * @return Schedule
      */
-    private Schedule insertOne(){
+    private Schedule insertOne() {
         //insertOne
         Schedule schedule = new Schedule();
         schedule.setStartTime(new Date());
@@ -80,7 +82,7 @@ public class ScheduleMapperTest {
      * test update
      */
     @Test
-    public void testUpdate(){
+    public void testUpdate() {
         //insertOne
         Schedule schedule = insertOne();
         schedule.setCreateTime(new Date());
@@ -93,7 +95,7 @@ public class ScheduleMapperTest {
      * test delete
      */
     @Test
-    public void testDelete(){
+    public void testDelete() {
         Schedule schedule = insertOne();
         int delete = scheduleMapper.deleteById(schedule.getId());
         Assert.assertEquals(delete, 1);
@@ -138,18 +140,15 @@ public class ScheduleMapperTest {
         processDefinition.setUpdateTime(new Date());
         processDefinitionMapper.insert(processDefinition);
 
-        Schedule schedule= insertOne();
+        Schedule schedule = insertOne();
         schedule.setUserId(user.getId());
-        schedule.setProcessDefinitionId(processDefinition.getId());
+        schedule.setProcessDefinitionCode(processDefinition.getCode());
         scheduleMapper.insert(schedule);
 
         Page<Schedule> page = new Page(1,3);
-        IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(page,
-                processDefinition.getId(), ""
-        );
+        IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page,
+                processDefinition.getCode(), "");
         Assert.assertNotEquals(scheduleIPage.getSize(), 0);
-
-
     }
 
     /**
@@ -158,7 +157,6 @@ public class ScheduleMapperTest {
     @Test
     public void testQuerySchedulerListByProjectName() {
 
-
         User user = new User();
         user.setUserName("ut name");
         userMapper.insert(user);
@@ -181,9 +179,9 @@ public class ScheduleMapperTest {
         processDefinition.setUpdateTime(new Date());
         processDefinitionMapper.insert(processDefinition);
 
-        Schedule schedule= insertOne();
+        Schedule schedule = insertOne();
         schedule.setUserId(user.getId());
-        schedule.setProcessDefinitionId(processDefinition.getId());
+        schedule.setProcessDefinitionCode(processDefinition.getCode());
         scheduleMapper.insert(schedule);
 
         Page<Schedule> page = new Page(1,3);
@@ -201,11 +199,11 @@ public class ScheduleMapperTest {
     public void testSelectAllByProcessDefineArray() {
 
         Schedule schedule = insertOne();
-        schedule.setProcessDefinitionId(12345);
+        schedule.setProcessDefinitionCode(12345);
         schedule.setReleaseState(ReleaseState.ONLINE);
         scheduleMapper.updateById(schedule);
 
-        List<Schedule> schedules= scheduleMapper.selectAllByProcessDefineArray(new int[] {schedule.getProcessDefinitionId()});
+        List<Schedule> schedules = scheduleMapper.selectAllByProcessDefineArray(new long[] {schedule.getProcessDefinitionCode()});
         Assert.assertNotEquals(schedules.size(), 0);
     }
 
@@ -215,10 +213,10 @@ public class ScheduleMapperTest {
     @Test
     public void queryByProcessDefinitionId() {
         Schedule schedule = insertOne();
-        schedule.setProcessDefinitionId(12345);
+        schedule.setProcessDefinitionCode(12345);
         scheduleMapper.updateById(schedule);
 
-        List<Schedule> schedules= scheduleMapper.queryByProcessDefinitionId(schedule.getProcessDefinitionId());
+        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
         Assert.assertNotEquals(schedules.size(), 0);
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
index 28e9dc2..cc60ce4 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
@@ -112,7 +112,7 @@ public class WorkFlowLineageMapperTest {
         schedule.setWarningType(WarningType.NONE);
         schedule.setCreateTime(new Date());
         schedule.setUpdateTime(new Date());
-        schedule.setProcessDefinitionId(id);
+        schedule.setProcessDefinitionCode(id);
         scheduleMapper.insert(schedule);
         return schedule;
     }
@@ -131,8 +131,9 @@ public class WorkFlowLineageMapperTest {
     public void testQueryCodeRelation() {
         ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation();
 
-        List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryCodeRelation(processTaskRelation.getPreTaskCode()
-                , processTaskRelation.getPreTaskVersion(), 11L, 1L);
+        List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryCodeRelation(
+            processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion(),
+            processTaskRelation.getProcessDefinitionCode(), processTaskRelation.getProjectCode());
         Assert.assertNotEquals(workFlowLineages.size(), 0);
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 1863087..72f7b47 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -255,8 +255,8 @@ public class MasterExecThread implements Runnable {
         processService.saveProcessInstance(processInstance);
 
         // get schedules
-        int processDefinitionId = processInstance.getProcessDefinition().getId();
-        List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
+        List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
+                processInstance.getProcessDefinitionCode());
         List<Date> listDate = Lists.newLinkedList();
         if (!CollectionUtils.isEmpty(schedules)) {
             for (Schedule schedule : schedules) {
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index fbc4ed8..196fb54 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -76,7 +76,7 @@ public class MasterExecThreadTest {
 
     private ProcessService processService;
 
-    private int processDefinitionId = 1;
+    private long processDefinitionCode = 1L;
 
     private MasterConfig config;
 
@@ -104,6 +104,7 @@ public class MasterExecThreadTest {
         processDefinition.setGlobalParamMap(Collections.EMPTY_MAP);
         processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
         Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
+        Mockito.when(processInstance.getProcessDefinitionCode()).thenReturn(processDefinitionCode);
 
         masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService, null, null, config));
         // prepareProcess init dag
@@ -120,9 +121,9 @@ public class MasterExecThreadTest {
      * without schedule
      */
     @Test
-    public void testParallelWithOutSchedule() throws ParseException {
+    public void testParallelWithOutSchedule() {
         try {
-            Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+            Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
             Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
             method.setAccessible(true);
             method.invoke(masterExecThread);
@@ -140,12 +141,12 @@ public class MasterExecThreadTest {
     @Test
     public void testParallelWithSchedule() {
         try {
-            Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
+            Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList());
             Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
             method.setAccessible(true);
             method.invoke(masterExecThread);
             // one create save, and 9(1 to 20 step 2) for next save, and last day 31 no save
-            verify(processService, times(20)).saveProcessInstance(processInstance);
+            verify(processService, times(9)).saveProcessInstance(processInstance);
         } catch (Exception e) {
             Assert.fail();
         }
@@ -267,4 +268,4 @@ public class MasterExecThreadTest {
         return schedulerList;
     }
 
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 83b5e8c..01e4678 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ConditionType;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
 import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -64,7 +63,6 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeExceptio
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.CycleDependency;
 import org.apache.dolphinscheduler.dao.entity.DagData;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
@@ -108,11 +106,9 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
-import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Calendar;
 import java.util.Date;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -125,14 +121,12 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
-import com.cronutils.model.Cron;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -404,6 +398,16 @@ public class ProcessService {
     }
 
     /**
+     * find process define by code.
+     *
+     * @param processDefinitionCode processDefinitionCode
+     * @return process definition
+     */
+    public ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode) {
+        return processDefineMapper.queryByCode(processDefinitionCode);
+    }
+
+    /**
      * delete work process instance by id
      *
      * @param processInstanceId processInstanceId
@@ -483,7 +487,6 @@ public class ProcessService {
     public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
         List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
 
-
         if (taskNodeList != null && !taskNodeList.isEmpty()) {
 
             for (TaskDefinition taskNode : taskNodeList) {
@@ -1628,7 +1631,6 @@ public class ProcessService {
         taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
     }
 
-
     /**
      * convert integer list to string list
      *
@@ -1657,13 +1659,13 @@ public class ProcessService {
     }
 
     /**
-     * query Schedule by processDefinitionId
+     * query Schedule by processDefinitionCode
      *
-     * @param processDefinitionId processDefinitionId
+     * @param processDefinitionCode processDefinitionCode
      * @see Schedule
      */
-    public List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) {
-        return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
+    public List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode) {
+        return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
     }
 
     /**
@@ -1730,7 +1732,6 @@ public class ProcessService {
         ProcessInstance instance = processInstanceMapper.selectById(processInstanceId);
         instance.setState(executionStatus);
         return processInstanceMapper.updateById(instance);
-
     }
 
     /**
@@ -1785,98 +1786,13 @@ public class ProcessService {
     }
 
     /**
-     * find schedule list by process define id.
+     * find schedule list by process define codes.
      *
-     * @param ids ids
+     * @param codes codes
      * @return schedule list
      */
-    public List<Schedule> selectAllByProcessDefineId(int[] ids) {
-        return scheduleMapper.selectAllByProcessDefineArray(
-                ids);
-    }
-
-    /**
-     * get dependency cycle by work process define id and scheduler fire time
-     *
-     * @param masterId masterId
-     * @param processDefinitionId processDefinitionId
-     * @param scheduledFireTime the time the task schedule is expected to trigger
-     * @return CycleDependency
-     * @throws Exception if error throws Exception
-     */
-    public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
-        List<CycleDependency> list = getCycleDependencies(masterId, new int[]{processDefinitionId}, scheduledFireTime);
-        return !list.isEmpty() ? list.get(0) : null;
-
-    }
-
-    /**
-     * get dependency cycle list by work process define id list and scheduler fire time
-     *
-     * @param masterId masterId
-     * @param ids ids
-     * @param scheduledFireTime the time the task schedule is expected to trigger
-     * @return CycleDependency list
-     * @throws Exception if error throws Exception
-     */
-    public List<CycleDependency> getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception {
-        List<CycleDependency> cycleDependencyList = new ArrayList<>();
-        if (null == ids || ids.length == 0) {
-            logger.warn("ids[] is empty!is invalid!");
-            return cycleDependencyList;
-        }
-        if (scheduledFireTime == null) {
-            logger.warn("scheduledFireTime is null!is invalid!");
-            return cycleDependencyList;
-        }
-
-        String strCrontab = "";
-        CronExpression depCronExpression;
-        Cron depCron;
-        List<Date> list;
-        List<Schedule> schedules = this.selectAllByProcessDefineId(ids);
-        // for all scheduling information
-        for (Schedule depSchedule : schedules) {
-            strCrontab = depSchedule.getCrontab();
-            depCronExpression = CronUtils.parse2CronExpression(strCrontab);
-            depCron = CronUtils.parse2Cron(strCrontab);
-            CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron);
-            if (cycleEnum == null) {
-                logger.error("{} is not valid", strCrontab);
-                continue;
-            }
-            Calendar calendar = Calendar.getInstance();
-            switch (cycleEnum) {
-                case HOUR:
-                    calendar.add(Calendar.HOUR, -25);
-                    break;
-                case DAY:
-                case WEEK:
-                    calendar.add(Calendar.DATE, -32);
-                    break;
-                case MONTH:
-                    calendar.add(Calendar.MONTH, -13);
-                    break;
-                default:
-                    String cycleName = cycleEnum.name();
-                    logger.warn("Dependent process definition's  cycleEnum is {},not support!!", cycleName);
-                    continue;
-            }
-            Date start = calendar.getTime();
-
-            if (depSchedule.getProcessDefinitionId() == masterId) {
-                list = CronUtils.getSelfFireDateList(start, scheduledFireTime, depCronExpression);
-            } else {
-                list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
-            }
-            if (!list.isEmpty()) {
-                start = list.get(list.size() - 1);
-                CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
-                cycleDependencyList.add(dependency);
-            }
-
-        }
-        return cycleDependencyList;
+    public List<Schedule> selectAllByProcessDefineCode(long[] codes) {
+        return scheduleMapper.selectAllByProcessDefineArray(codes);
     }
 
     /**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 2921ce2..bda8ad8 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -81,11 +81,11 @@ public class ProcessScheduleJob implements Job {
             return;
         }
 
-        ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
+        ProcessDefinition processDefinition = getProcessService().findProcessDefinitionByCode(schedule.getProcessDefinitionCode());
         // release state : online/offline
         ReleaseState releaseState = processDefinition.getReleaseState();
         if (releaseState == ReleaseState.OFFLINE) {
-            logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId);
+            logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, processDefinition.getId());
             return;
         }
 
@@ -93,7 +93,7 @@ public class ProcessScheduleJob implements Job {
         command.setCommandType(CommandType.SCHEDULER);
         command.setExecutorId(schedule.getUserId());
         command.setFailureStrategy(schedule.getFailureStrategy());
-        command.setProcessDefinitionId(schedule.getProcessDefinitionId());
+        //command.setProcessDefinitionId(schedule.getProcessDefinitionCode()); TODO next pr
         command.setScheduleTime(scheduledFireTime);
         command.setStartTime(fireTime);
         command.setWarningGroupId(schedule.getWarningGroupId());
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index a6e3f97..11bdc8d 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -747,7 +747,7 @@ CREATE TABLE `t_ds_resources` (
 DROP TABLE IF EXISTS `t_ds_schedules`;
 CREATE TABLE `t_ds_schedules` (
   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
-  `process_definition_id` int(11) NOT NULL COMMENT 'process definition id',
+  `process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
   `start_time` datetime NOT NULL COMMENT 'start time',
   `end_time` datetime NOT NULL COMMENT 'end time',
   `timezone_id` varchar(40) DEFAULT NULL COMMENT 'timezoneId',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 88cf143..eb97912 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -611,7 +611,7 @@ CREATE TABLE t_ds_resources (
 DROP TABLE IF EXISTS t_ds_schedules;
 CREATE TABLE t_ds_schedules (
   id int NOT NULL  ,
-  process_definition_id int NOT NULL ,
+  process_definition_code bigint NOT NULL ,
   start_time timestamp NOT NULL ,
   end_time timestamp NOT NULL ,
   timezone_id varchar(40) default NULL ,