You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/08/02 02:32:41 UTC
[dolphinscheduler] branch json_split_two updated: [Fix-5778]: The
t_ds_schedules table,
process_definition_id change to process_definition_code (#5928)
This is an automated email from the ASF dual-hosted git repository.
wenhemin pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split_two by this push:
new d1a61f1 [Fix-5778]: The t_ds_schedules table, process_definition_id change to process_definition_code (#5928)
d1a61f1 is described below
commit d1a61f16934c0c9423a507cc129d55ff5aace84a
Author: wen-hemin <39...@users.noreply.github.com>
AuthorDate: Mon Aug 2 10:32:32 2021 +0800
[Fix-5778]: The t_ds_schedules table, process_definition_id change to process_definition_code (#5928)
* fix: the t_ds_schedules table, process_definition_id change to process_definition_code
* fix checkstyle
* fix: recovery code
* fix UT
Co-authored-by: wen-hemin <we...@apache.com>
---
.../api/service/impl/ExecutorServiceImpl.java | 2 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 8 +-
.../api/service/impl/SchedulerServiceImpl.java | 14 +--
.../api/service/ExecutorServiceTest.java | 12 +--
.../api/service/ProcessDefinitionServiceTest.java | 18 +---
.../api/service/SchedulerServiceTest.java | 12 ++-
.../dolphinscheduler/dao/entity/Schedule.java | 15 +--
.../dao/mapper/ScheduleMapper.java | 32 +++---
.../dao/mapper/ProcessDefinitionMapper.xml | 6 +-
.../dolphinscheduler/dao/mapper/ScheduleMapper.xml | 32 +++---
.../dao/mapper/WorkFlowLineageMapper.xml | 4 +-
.../dao/mapper/ScheduleMapperTest.java | 44 ++++----
.../dao/mapper/WorkFlowLineageMapperTest.java | 7 +-
.../server/master/runner/MasterExecThread.java | 4 +-
.../server/master/MasterExecThreadTest.java | 13 +--
.../service/process/ProcessService.java | 120 ++++-----------------
.../service/quartz/ProcessScheduleJob.java | 6 +-
sql/dolphinscheduler_mysql.sql | 2 +-
sql/dolphinscheduler_postgre.sql | 2 +-
19 files changed, 134 insertions(+), 219 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 1d944d0..8978257 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -549,7 +549,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
} else if (runMode == RunMode.RUN_MODE_PARALLEL) {
- List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
+ List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineId); // TODO: next pr change to code
List<Date> listDate = new LinkedList<>();
if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule item : schedules) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index b28e899..65e2b10 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -555,7 +555,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
// get the timing according to the process definition
- List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
+ List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionId);
if (!schedules.isEmpty() && schedules.size() > 1) {
logger.warn("scheduler num is {},Greater than 1", schedules.size());
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
@@ -630,7 +630,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray(
- new int[]{processDefinition.getId()}
+ new long[]{processDefinition.getCode()}
);
for (Schedule schedule : scheduleList) {
@@ -712,7 +712,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return DagDataSchedule
*/
public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) {
- List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinition.getId());
+ List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition));
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
@@ -821,7 +821,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Schedule schedule = dagDataSchedule.getSchedule();
if (null != schedule) {
ProcessDefinition newProcessDefinition = processDefinitionMapper.queryByCode(processDefinition.getCode());
- schedule.setProcessDefinitionId(newProcessDefinition.getId());
+ schedule.setProcessDefinitionCode(newProcessDefinition.getCode());
schedule.setUserId(loginUser.getId());
schedule.setCreateTime(now);
schedule.setUpdateTime(now);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 350ab9a..edc7074 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -140,7 +140,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Date now = new Date();
scheduleObj.setProjectName(project.getName());
- scheduleObj.setProcessDefinitionId(processDefinition.getId());
+ scheduleObj.setProcessDefinitionCode(processDefineCode);
scheduleObj.setProcessDefinitionName(processDefinition.getName());
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
@@ -228,9 +228,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
- ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId());
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode());
return result;
}
@@ -326,9 +326,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
- ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId());
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
return result;
}
@@ -342,7 +342,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
// check sub process definition release state
List<Integer> subProcessDefineIds = new ArrayList<>();
- processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
+ processService.recurseFindSubProcessId(processDefinition.getId(), subProcessDefineIds);
Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
if (!subProcessDefineIds.isEmpty()) {
List<ProcessDefinition> subProcessDefinitionList =
@@ -430,7 +430,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
Page<Schedule> page = new Page<>(pageNo, pageSize);
- IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(page, processDefinition.getId(),
+ IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode,
searchVal);
PageInfo<Schedule> pageInfo = new PageInfo<>(pageNo, pageSize);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 624fa6e..2cae051 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -152,7 +152,7 @@ public class ExecutorServiceTest {
@Test
public void testNoComplement() {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, null,
@@ -170,7 +170,7 @@ public class ExecutorServiceTest {
@Test
public void testComplementWithStartNodeList() {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, "n1,n2",
@@ -188,7 +188,7 @@ public class ExecutorServiceTest {
@Test
public void testDateError() {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
@@ -205,7 +205,7 @@ public class ExecutorServiceTest {
@Test
public void testSerial() {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
@@ -222,7 +222,7 @@ public class ExecutorServiceTest {
@Test
public void testParallelWithOutSchedule() {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
@@ -240,7 +240,7 @@ public class ExecutorServiceTest {
@Test
public void testParallelWithSchedule() {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index c25fa59..f6a6a7f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.api.service;
import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
@@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.DagData;
@@ -55,10 +52,6 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.http.entity.ContentType;
-
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -70,7 +63,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.junit.Assert;
@@ -80,8 +72,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
-import org.springframework.mock.web.MockMultipartFile;
-import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -516,7 +506,7 @@ public class ProcessDefinitionServiceTest {
List<Schedule> schedules = new ArrayList<>();
schedules.add(getSchedule());
schedules.add(getSchedule());
- Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
+ Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS));
@@ -527,7 +517,7 @@ public class ProcessDefinitionServiceTest {
schedules.add(schedule);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
- Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
+ Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
@@ -537,7 +527,7 @@ public class ProcessDefinitionServiceTest {
schedules.add(schedule);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
- Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
+ Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
@@ -950,7 +940,7 @@ public class ProcessDefinitionServiceTest {
Date date = new Date();
Schedule schedule = new Schedule();
schedule.setId(46);
- schedule.setProcessDefinitionId(1);
+ schedule.setProcessDefinitionCode(1);
schedule.setStartTime(date);
schedule.setEndTime(date);
schedule.setCrontab("0 0 5 * * ? *");
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index 7cd383a..238d1d0 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -71,6 +72,9 @@ public class SchedulerServiceTest {
private ProjectMapper projectMapper;
@Mock
+ private ProcessDefinitionMapper processDefinitionMapper;
+
+ @Mock
private ProjectServiceImpl projectService;
@Mock
@@ -102,7 +106,7 @@ public class SchedulerServiceTest {
Schedule schedule = new Schedule();
schedule.setId(1);
- schedule.setProcessDefinitionId(1);
+ schedule.setProcessDefinitionCode(1);
schedule.setReleaseState(ReleaseState.OFFLINE);
List<Server> masterServers = new ArrayList<>();
@@ -113,7 +117,7 @@ public class SchedulerServiceTest {
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
- Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
//hash no auth
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
@@ -128,10 +132,10 @@ public class SchedulerServiceTest {
Assert.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
//PROCESS_DEFINE_NOT_EXIST
- schedule.setProcessDefinitionId(2);
+ schedule.setProcessDefinitionCode(2);
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
- schedule.setProcessDefinitionId(1);
+ schedule.setProcessDefinitionCode(1);
// PROCESS_DEFINE_NOT_RELEASE
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java
index 74ed5c1..e1b4c90 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java
@@ -39,10 +39,11 @@ public class Schedule {
@TableId(value = "id", type = IdType.AUTO)
private int id;
+
/**
- * process definition id
+ * process definition code
*/
- private int processDefinitionId;
+ private long processDefinitionCode;
/**
* process definition name
@@ -222,12 +223,12 @@ public class Schedule {
this.releaseState = releaseState;
}
- public int getProcessDefinitionId() {
- return processDefinitionId;
+ public long getProcessDefinitionCode() {
+ return processDefinitionCode;
}
- public void setProcessDefinitionId(int processDefinitionId) {
- this.processDefinitionId = processDefinitionId;
+ public void setProcessDefinitionCode(long processDefinitionCode) {
+ this.processDefinitionCode = processDefinitionCode;
}
public String getProcessDefinitionName() {
@@ -290,7 +291,7 @@ public class Schedule {
public String toString() {
return "Schedule{"
+ "id=" + id
- + ", processDefinitionId=" + processDefinitionId
+ + ", processDefinitionCode=" + processDefinitionCode
+ ", processDefinitionName='" + processDefinitionName + '\''
+ ", projectName='" + projectName + '\''
+ ", description='" + definitionDescription + '\''
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index 225677d..37fae5d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.Schedule;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
+
import org.apache.ibatis.annotations.Param;
import java.util.List;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+
/**
* scheduler mapper interface
*/
@@ -31,13 +33,13 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/**
* scheduler page
* @param page page
- * @param processDefinitionId processDefinitionId
+ * @param processDefinitionCode processDefinitionCode
* @param searchVal searchVal
* @return scheduler IPage
*/
- IPage<Schedule> queryByProcessDefineIdPaging(IPage<Schedule> page,
- @Param("processDefinitionId") int processDefinitionId,
- @Param("searchVal") String searchVal);
+ IPage<Schedule> queryByProcessDefineCodePaging(IPage<Schedule> page,
+ @Param("processDefinitionCode") long processDefinitionCode,
+ @Param("searchVal") String searchVal);
/**
* query schedule list by project name
@@ -47,24 +49,24 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
List<Schedule> querySchedulerListByProjectName(@Param("projectName") String projectName);
/**
- * query schedule list by process definition ids
- * @param processDefineIds processDefineIds
+ * query schedule list by process definition codes
+ * @param processDefineCodes processDefineCodes
* @return schedule list
*/
- List<Schedule> selectAllByProcessDefineArray(@Param("processDefineIds") int[] processDefineIds);
+ List<Schedule> selectAllByProcessDefineArray(@Param("processDefineCodes") long[] processDefineCodes);
/**
- * query schedule list by process definition id
- * @param processDefinitionId processDefinitionId
+ * query schedule list by process definition code
+ * @param processDefinitionCode processDefinitionCode
* @return schedule list
*/
- List<Schedule> queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
+ List<Schedule> queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
- * query schedule list by process definition id
- * @param processDefinitionId processDefinitionId
+ * query schedule list by process definition code
+ * @param processDefinitionCode processDefinitionCode
* @return schedule list
*/
- List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
+ List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index 0ede680..590e447 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -72,8 +72,8 @@
td.global_params, td.flag, td.warning_group_id, td.timeout, td.tenant_id, td.update_time, td.create_time,
sc.schedule_release_state, tu.user_name
FROM t_ds_process_definition td
- left join (select process_definition_id,release_state as schedule_release_state from t_ds_schedules group by
- process_definition_id,release_state) sc on sc.process_definition_id = td.id
+ left join (select process_definition_code,release_state as schedule_release_state from t_ds_schedules group by
+ process_definition_code,release_state) sc on sc.process_definition_code = td.code
left join t_ds_user tu on td.user_id = tu.id
where td.project_code = #{projectCode}
<if test=" searchVal != null and searchVal != ''">
@@ -120,7 +120,7 @@
</if>
group by td.user_id,tu.user_name
</select>
-
+
<select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT
pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description,
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
index 587680f..482d30b 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
@@ -19,32 +19,34 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ScheduleMapper">
<sql id="baseSql">
- id, process_definition_id, start_time, end_time, timezone_id, crontab, failure_strategy, user_id, release_state,
+ id, process_definition_code, start_time, end_time, timezone_id, crontab, failure_strategy, user_id, release_state,
warning_type, warning_group_id, process_instance_priority, worker_group, create_time, update_time
</sql>
<sql id="baseSqlV2">
- ${alias}.id, ${alias}.process_definition_id, ${alias}.start_time, ${alias}.end_time, ${alias}.timezone_id, ${alias}.crontab, ${alias}.failure_strategy, ${alias}.user_id, ${alias}.release_state,
- ${alias}.warning_type, ${alias}.warning_group_id, ${alias}.process_instance_priority, ${alias}.worker_group, ${alias}.create_time, ${alias}.update_time
+ ${alias}.id, ${alias}.process_definition_code, ${alias}.start_time, ${alias}.end_time, ${alias}.timezone_id,
+ ${alias}.crontab, ${alias}.failure_strategy, ${alias}.user_id, ${alias}.release_state, ${alias}.warning_type,
+ ${alias}.warning_group_id, ${alias}.process_instance_priority, ${alias}.worker_group, ${alias}.create_time,
+ ${alias}.update_time
</sql>
- <select id="queryByProcessDefineIdPaging" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
+ <select id="queryByProcessDefineCodePaging" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select p_f.name as process_definition_name, p.name as project_name,u.user_name,
<include refid="baseSqlV2">
<property name="alias" value="s"/>
</include>
from t_ds_schedules s
- join t_ds_process_definition p_f on s.process_definition_id = p_f.id
+ join t_ds_process_definition p_f on s.process_definition_code = p_f.code
join t_ds_project as p on p_f.project_code = p.code
join t_ds_user as u on s.user_id = u.id
where 1=1
- <if test="processDefinitionId!= 0">
- and s.process_definition_id = #{processDefinitionId}
+ <if test="processDefinitionCode != 0">
+ and s.process_definition_code = #{processDefinitionCode}
</if>
order by s.update_time desc
</select>
<select id="querySchedulerListByProjectName" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select p_f.name as process_definition_name, p_f.description as definition_description, p.name as project_name,u.user_name,s.*
from t_ds_schedules s
- join t_ds_process_definition p_f on s.process_definition_id = p_f.id
+ join t_ds_process_definition p_f on s.process_definition_code = p_f.code
join t_ds_project as p on p_f.project_code = p.code
join t_ds_user as u on s.user_id = u.id
where p.name = #{projectName}
@@ -54,25 +56,25 @@
<include refid="baseSql"/>
from t_ds_schedules
where 1= 1
- <if test="processDefineIds != null and processDefineIds.length != 0 ">
- and process_definition_id in
- <foreach collection="processDefineIds" index="index" item="i" open="(" separator="," close=")">
+ <if test="processDefineCodes != null and processDefineCodes.length != 0 ">
+ and process_definition_code in
+ <foreach collection="processDefineCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
and release_state = 1
</select>
- <select id="queryByProcessDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
+ <select id="queryByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select
<include refid="baseSql"/>
from t_ds_schedules
- where process_definition_id =#{processDefinitionId}
+ where process_definition_code = #{processDefinitionCode}
</select>
- <select id="queryReleaseSchedulerListByProcessDefinitionId"
+ <select id="queryReleaseSchedulerListByProcessDefinitionCode"
resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select
<include refid="baseSql"/>
from t_ds_schedules
- where process_definition_id =#{processDefinitionId} and release_state = 1
+ where process_definition_code = #{processDefinitionCode} and release_state = 1
</select>
</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index b78113b..eeddaf5 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -22,7 +22,7 @@
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.id as work_flow_id,tepd.name as work_flow_name
from t_ds_process_definition tepd
- left join t_ds_schedules tes on tepd.id = tes.process_definition_id
+ left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode}
<if test="searchVal != null and searchVal != ''">
and tepd.name like concat('%', #{searchVal}, '%')
@@ -74,7 +74,7 @@
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
- left join t_ds_schedules tes on tepd.id = tes.process_definition_id
+ left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode} and tepd.code = #{processDefinitionCode}
</select>
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
index e55aaca..752eca4 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.mapper;
+package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
@@ -24,8 +24,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
+import java.util.Date;
+import java.util.List;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -35,8 +37,8 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
-import java.util.Date;
-import java.util.List;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@RunWith(SpringRunner.class)
@SpringBootTest
@@ -61,7 +63,7 @@ public class ScheduleMapperTest {
* insert
* @return Schedule
*/
- private Schedule insertOne(){
+ private Schedule insertOne() {
//insertOne
Schedule schedule = new Schedule();
schedule.setStartTime(new Date());
@@ -80,7 +82,7 @@ public class ScheduleMapperTest {
* test update
*/
@Test
- public void testUpdate(){
+ public void testUpdate() {
//insertOne
Schedule schedule = insertOne();
schedule.setCreateTime(new Date());
@@ -93,7 +95,7 @@ public class ScheduleMapperTest {
* test delete
*/
@Test
- public void testDelete(){
+ public void testDelete() {
Schedule schedule = insertOne();
int delete = scheduleMapper.deleteById(schedule.getId());
Assert.assertEquals(delete, 1);
@@ -138,18 +140,15 @@ public class ScheduleMapperTest {
processDefinition.setUpdateTime(new Date());
processDefinitionMapper.insert(processDefinition);
- Schedule schedule= insertOne();
+ Schedule schedule = insertOne();
schedule.setUserId(user.getId());
- schedule.setProcessDefinitionId(processDefinition.getId());
+ schedule.setProcessDefinitionCode(processDefinition.getCode());
scheduleMapper.insert(schedule);
Page<Schedule> page = new Page(1,3);
- IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(page,
- processDefinition.getId(), ""
- );
+ IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page,
+ processDefinition.getCode(), "");
Assert.assertNotEquals(scheduleIPage.getSize(), 0);
-
-
}
/**
@@ -158,7 +157,6 @@ public class ScheduleMapperTest {
@Test
public void testQuerySchedulerListByProjectName() {
-
User user = new User();
user.setUserName("ut name");
userMapper.insert(user);
@@ -181,9 +179,9 @@ public class ScheduleMapperTest {
processDefinition.setUpdateTime(new Date());
processDefinitionMapper.insert(processDefinition);
- Schedule schedule= insertOne();
+ Schedule schedule = insertOne();
schedule.setUserId(user.getId());
- schedule.setProcessDefinitionId(processDefinition.getId());
+ schedule.setProcessDefinitionCode(processDefinition.getCode());
scheduleMapper.insert(schedule);
Page<Schedule> page = new Page(1,3);
@@ -201,11 +199,11 @@ public class ScheduleMapperTest {
public void testSelectAllByProcessDefineArray() {
Schedule schedule = insertOne();
- schedule.setProcessDefinitionId(12345);
+ schedule.setProcessDefinitionCode(12345);
schedule.setReleaseState(ReleaseState.ONLINE);
scheduleMapper.updateById(schedule);
- List<Schedule> schedules= scheduleMapper.selectAllByProcessDefineArray(new int[] {schedule.getProcessDefinitionId()});
+ List<Schedule> schedules = scheduleMapper.selectAllByProcessDefineArray(new long[] {schedule.getProcessDefinitionCode()});
Assert.assertNotEquals(schedules.size(), 0);
}
@@ -215,10 +213,10 @@ public class ScheduleMapperTest {
@Test
public void queryByProcessDefinitionId() {
Schedule schedule = insertOne();
- schedule.setProcessDefinitionId(12345);
+ schedule.setProcessDefinitionCode(12345);
scheduleMapper.updateById(schedule);
- List<Schedule> schedules= scheduleMapper.queryByProcessDefinitionId(schedule.getProcessDefinitionId());
+ List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
Assert.assertNotEquals(schedules.size(), 0);
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
index 28e9dc2..cc60ce4 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java
@@ -112,7 +112,7 @@ public class WorkFlowLineageMapperTest {
schedule.setWarningType(WarningType.NONE);
schedule.setCreateTime(new Date());
schedule.setUpdateTime(new Date());
- schedule.setProcessDefinitionId(id);
+ schedule.setProcessDefinitionCode(id);
scheduleMapper.insert(schedule);
return schedule;
}
@@ -131,8 +131,9 @@ public class WorkFlowLineageMapperTest {
public void testQueryCodeRelation() {
ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation();
- List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryCodeRelation(processTaskRelation.getPreTaskCode()
- , processTaskRelation.getPreTaskVersion(), 11L, 1L);
+ List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryCodeRelation(
+ processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion(),
+ processTaskRelation.getProcessDefinitionCode(), processTaskRelation.getProjectCode());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 1863087..72f7b47 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -255,8 +255,8 @@ public class MasterExecThread implements Runnable {
processService.saveProcessInstance(processInstance);
// get schedules
- int processDefinitionId = processInstance.getProcessDefinition().getId();
- List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
+ List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
+ processInstance.getProcessDefinitionCode());
List<Date> listDate = Lists.newLinkedList();
if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule schedule : schedules) {
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index fbc4ed8..196fb54 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -76,7 +76,7 @@ public class MasterExecThreadTest {
private ProcessService processService;
- private int processDefinitionId = 1;
+ private long processDefinitionCode = 1L;
private MasterConfig config;
@@ -104,6 +104,7 @@ public class MasterExecThreadTest {
processDefinition.setGlobalParamMap(Collections.EMPTY_MAP);
processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
+ Mockito.when(processInstance.getProcessDefinitionCode()).thenReturn(processDefinitionCode);
masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService, null, null, config));
// prepareProcess init dag
@@ -120,9 +121,9 @@ public class MasterExecThreadTest {
* without schedule
*/
@Test
- public void testParallelWithOutSchedule() throws ParseException {
+ public void testParallelWithOutSchedule() {
try {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
@@ -140,12 +141,12 @@ public class MasterExecThreadTest {
@Test
public void testParallelWithSchedule() {
try {
- Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 9(1 to 20 step 2) for next save, and last day 31 no save
- verify(processService, times(20)).saveProcessInstance(processInstance);
+ verify(processService, times(9)).saveProcessInstance(processInstance);
} catch (Exception e) {
Assert.fail();
}
@@ -267,4 +268,4 @@ public class MasterExecThreadTest {
return schedulerList;
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 83b5e8c..01e4678 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ConditionType;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -64,7 +63,6 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeExceptio
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.CycleDependency;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
@@ -108,11 +106,9 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
-import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Calendar;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
@@ -125,14 +121,12 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
-import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -404,6 +398,16 @@ public class ProcessService {
}
/**
+ * find process define by code.
+ *
+ * @param processDefinitionCode processDefinitionCode
+ * @return process definition
+ */
+ public ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode) {
+ return processDefineMapper.queryByCode(processDefinitionCode);
+ }
+
+ /**
* delete work process instance by id
*
* @param processInstanceId processInstanceId
@@ -483,7 +487,6 @@ public class ProcessService {
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
-
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskDefinition taskNode : taskNodeList) {
@@ -1628,7 +1631,6 @@ public class ProcessService {
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
}
-
/**
* convert integer list to string list
*
@@ -1657,13 +1659,13 @@ public class ProcessService {
}
/**
- * query Schedule by processDefinitionId
+ * query Schedule by processDefinitionCode
*
- * @param processDefinitionId processDefinitionId
+ * @param processDefinitionCode processDefinitionCode
* @see Schedule
*/
- public List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) {
- return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
+ public List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode) {
+ return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
}
/**
@@ -1730,7 +1732,6 @@ public class ProcessService {
ProcessInstance instance = processInstanceMapper.selectById(processInstanceId);
instance.setState(executionStatus);
return processInstanceMapper.updateById(instance);
-
}
/**
@@ -1785,98 +1786,13 @@ public class ProcessService {
}
/**
- * find schedule list by process define id.
+ * find schedule list by process define codes.
*
- * @param ids ids
+ * @param codes codes
* @return schedule list
*/
- public List<Schedule> selectAllByProcessDefineId(int[] ids) {
- return scheduleMapper.selectAllByProcessDefineArray(
- ids);
- }
-
- /**
- * get dependency cycle by work process define id and scheduler fire time
- *
- * @param masterId masterId
- * @param processDefinitionId processDefinitionId
- * @param scheduledFireTime the time the task schedule is expected to trigger
- * @return CycleDependency
- * @throws Exception if error throws Exception
- */
- public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
- List<CycleDependency> list = getCycleDependencies(masterId, new int[]{processDefinitionId}, scheduledFireTime);
- return !list.isEmpty() ? list.get(0) : null;
-
- }
-
- /**
- * get dependency cycle list by work process define id list and scheduler fire time
- *
- * @param masterId masterId
- * @param ids ids
- * @param scheduledFireTime the time the task schedule is expected to trigger
- * @return CycleDependency list
- * @throws Exception if error throws Exception
- */
- public List<CycleDependency> getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception {
- List<CycleDependency> cycleDependencyList = new ArrayList<>();
- if (null == ids || ids.length == 0) {
- logger.warn("ids[] is empty!is invalid!");
- return cycleDependencyList;
- }
- if (scheduledFireTime == null) {
- logger.warn("scheduledFireTime is null!is invalid!");
- return cycleDependencyList;
- }
-
- String strCrontab = "";
- CronExpression depCronExpression;
- Cron depCron;
- List<Date> list;
- List<Schedule> schedules = this.selectAllByProcessDefineId(ids);
- // for all scheduling information
- for (Schedule depSchedule : schedules) {
- strCrontab = depSchedule.getCrontab();
- depCronExpression = CronUtils.parse2CronExpression(strCrontab);
- depCron = CronUtils.parse2Cron(strCrontab);
- CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron);
- if (cycleEnum == null) {
- logger.error("{} is not valid", strCrontab);
- continue;
- }
- Calendar calendar = Calendar.getInstance();
- switch (cycleEnum) {
- case HOUR:
- calendar.add(Calendar.HOUR, -25);
- break;
- case DAY:
- case WEEK:
- calendar.add(Calendar.DATE, -32);
- break;
- case MONTH:
- calendar.add(Calendar.MONTH, -13);
- break;
- default:
- String cycleName = cycleEnum.name();
- logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleName);
- continue;
- }
- Date start = calendar.getTime();
-
- if (depSchedule.getProcessDefinitionId() == masterId) {
- list = CronUtils.getSelfFireDateList(start, scheduledFireTime, depCronExpression);
- } else {
- list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
- }
- if (!list.isEmpty()) {
- start = list.get(list.size() - 1);
- CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
- cycleDependencyList.add(dependency);
- }
-
- }
- return cycleDependencyList;
+ public List<Schedule> selectAllByProcessDefineCode(long[] codes) {
+ return scheduleMapper.selectAllByProcessDefineArray(codes);
}
/**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 2921ce2..bda8ad8 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -81,11 +81,11 @@ public class ProcessScheduleJob implements Job {
return;
}
- ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
+ ProcessDefinition processDefinition = getProcessService().findProcessDefinitionByCode(schedule.getProcessDefinitionCode());
// release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState();
if (releaseState == ReleaseState.OFFLINE) {
- logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId);
+ logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, processDefinition.getId());
return;
}
@@ -93,7 +93,7 @@ public class ProcessScheduleJob implements Job {
command.setCommandType(CommandType.SCHEDULER);
command.setExecutorId(schedule.getUserId());
command.setFailureStrategy(schedule.getFailureStrategy());
- command.setProcessDefinitionId(schedule.getProcessDefinitionId());
+ //command.setProcessDefinitionId(schedule.getProcessDefinitionCode()); TODO next pr
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index a6e3f97..11bdc8d 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -747,7 +747,7 @@ CREATE TABLE `t_ds_resources` (
DROP TABLE IF EXISTS `t_ds_schedules`;
CREATE TABLE `t_ds_schedules` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
- `process_definition_id` int(11) NOT NULL COMMENT 'process definition id',
+ `process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`start_time` datetime NOT NULL COMMENT 'start time',
`end_time` datetime NOT NULL COMMENT 'end time',
`timezone_id` varchar(40) DEFAULT NULL COMMENT 'timezoneId',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 88cf143..eb97912 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -611,7 +611,7 @@ CREATE TABLE t_ds_resources (
DROP TABLE IF EXISTS t_ds_schedules;
CREATE TABLE t_ds_schedules (
id int NOT NULL ,
- process_definition_id int NOT NULL ,
+ process_definition_code bigint NOT NULL ,
start_time timestamp NOT NULL ,
end_time timestamp NOT NULL ,
timezone_id varchar(40) default NULL ,