You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/11/25 02:22:49 UTC
[dolphinscheduler] branch dev updated: update schdule api (#6977)
This is an automated email from the ASF dual-hosted git repository.
wenhemin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5a04b8d update schdule api (#6977)
5a04b8d is described below
commit 5a04b8d49aa8e30a60a3fdc0cdb3b2f6910aa084
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Thu Nov 25 10:22:41 2021 +0800
update schdule api (#6977)
---
.../apache/dolphinscheduler/api/enums/Status.java | 2 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 27 ++--
.../api/service/impl/SchedulerServiceImpl.java | 146 +++++++++++++--------
.../api/service/ProcessDefinitionServiceTest.java | 18 +--
.../dao/mapper/ScheduleMapper.java | 4 +-
.../dao/mapper/ScheduleMapperTest.java | 6 +-
.../server/PythonGatewayServer.java | 6 +-
7 files changed, 120 insertions(+), 89 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 506e6a6..4fbec8e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -269,7 +269,7 @@ public enum Status {
CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"),
PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
- PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"),
+ PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"),
CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
CREATE_TASK_DEFINITION_ERROR(50037, "create task definition error", "创建任务错误"),
UPDATE_TASK_DEFINITION_ERROR(50038, "update task definition error", "更新任务定义错误"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 17ca1dd..1276717 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -664,21 +664,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
// get the timing according to the process definition
- List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(code);
- if (!schedules.isEmpty() && schedules.size() > 1) {
- logger.warn("scheduler num is {},Greater than 1", schedules.size());
- putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
- return result;
- } else if (schedules.size() == 1) {
- Schedule schedule = schedules.get(0);
- if (schedule.getReleaseState() == ReleaseState.OFFLINE) {
- int delete = scheduleMapper.deleteById(schedule.getId());
+ Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
+ if (scheduleObj != null) {
+ if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
+ int delete = scheduleMapper.deleteById(scheduleObj.getId());
if (delete == 0) {
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
- } else if (schedule.getReleaseState() == ReleaseState.ONLINE) {
- putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
+ }
+ if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
+ putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
return result;
}
}
@@ -815,12 +811,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return DagDataSchedule
*/
public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) {
- List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
+ Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition));
- if (!schedules.isEmpty()) {
- Schedule schedule = schedules.get(0);
- schedule.setReleaseState(ReleaseState.OFFLINE);
- dagDataSchedule.setSchedule(schedule);
+ if (scheduleObj != null) {
+ scheduleObj.setReleaseState(ReleaseState.OFFLINE);
+ dagDataSchedule.setSchedule(scheduleObj);
}
return dagDataSchedule;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 0a920aa..7658efe 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -36,10 +36,12 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -101,6 +103,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private Scheduler scheduler;
+ @Autowired
+ private ProcessTaskRelationMapper processTaskRelationMapper;
+
/**
* save schedule
*
@@ -247,57 +252,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
- /**
- * scheduling on-line status forbid modification
- */
- if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
- return result;
- }
-
- Date now = new Date();
-
- // updateProcessInstance param
- if (!StringUtils.isEmpty(scheduleExpression)) {
- ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
- if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
- logger.warn("The start time must not be the same as the end");
- putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
- return result;
- }
- schedule.setStartTime(scheduleParam.getStartTime());
- schedule.setEndTime(scheduleParam.getEndTime());
- if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
- putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
- return result;
- }
- schedule.setCrontab(scheduleParam.getCrontab());
- schedule.setTimezoneId(scheduleParam.getTimezoneId());
- }
-
- if (warningType != null) {
- schedule.setWarningType(warningType);
- }
-
- schedule.setWarningGroupId(warningGroupId);
-
- if (failureStrategy != null) {
- schedule.setFailureStrategy(failureStrategy);
- }
-
- schedule.setWorkerGroup(workerGroup);
- schedule.setEnvironmentCode(environmentCode);
- schedule.setUpdateTime(now);
- schedule.setProcessInstancePriority(processInstancePriority);
- scheduleMapper.updateById(schedule);
-
- /**
- * updateProcessInstance recipients and cc by process definition ID
- */
- processDefinition.setWarningGroupId(warningGroupId);
-
- processDefinitionMapper.updateById(processDefinition);
-
- putMsg(result, Status.SUCCESS);
+ updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
return result;
}
@@ -345,7 +300,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
return result;
}
-
+ List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
+ if (processTaskRelations.isEmpty()) {
+ putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
+ return result;
+ }
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
@@ -633,6 +592,87 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Priority processInstancePriority,
String workerGroup,
long environmentCode) {
- return null;
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ // check schedule exists
+ Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+ if (schedule == null) {
+ putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, processDefinitionCode);
+ return result;
+ }
+
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+ if (processDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
+ return result;
+ }
+
+ updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
+ return result;
+ }
+
+ private void updateSchedule(Map<String, Object> result,
+ Schedule schedule,
+ ProcessDefinition processDefinition,
+ String scheduleExpression,
+ WarningType warningType,
+ int warningGroupId,
+ FailureStrategy failureStrategy,
+ Priority processInstancePriority,
+ String workerGroup,
+ long environmentCode) {
+ if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
+ return;
+ }
+
+ Date now = new Date();
+
+ // updateProcessInstance param
+ if (!StringUtils.isEmpty(scheduleExpression)) {
+ ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
+ if (scheduleParam == null) {
+ putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
+ return;
+ }
+ if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
+ logger.warn("The start time must not be the same as the end");
+ putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
+ return;
+ }
+ schedule.setStartTime(scheduleParam.getStartTime());
+ schedule.setEndTime(scheduleParam.getEndTime());
+ if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+ putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
+ return;
+ }
+ schedule.setCrontab(scheduleParam.getCrontab());
+ schedule.setTimezoneId(scheduleParam.getTimezoneId());
+ }
+
+ if (warningType != null) {
+ schedule.setWarningType(warningType);
+ }
+
+ schedule.setWarningGroupId(warningGroupId);
+
+ if (failureStrategy != null) {
+ schedule.setFailureStrategy(failureStrategy);
+ }
+
+ schedule.setWorkerGroup(workerGroup);
+ schedule.setEnvironmentCode(environmentCode);
+ schedule.setUpdateTime(now);
+ schedule.setProcessInstancePriority(processInstancePriority);
+ scheduleMapper.updateById(schedule);
+
+ processDefinition.setWarningGroupId(warningGroupId);
+
+ processDefinitionMapper.updateById(processDefinition);
+
+ putMsg(result, Status.SUCCESS);
}
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 7fe02eb..964555b 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -378,32 +378,28 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
- List<Schedule> schedules = new ArrayList<>();
- schedules.add(getSchedule());
- schedules.add(getSchedule());
- Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
+ Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
+ Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
+ Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
+ Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
- Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS));
+ Assert.assertEquals(Status.SUCCESS, schedulerGreaterThanOneRes.get(Constants.STATUS));
//scheduler online
- schedules.clear();
Schedule schedule = getSchedule();
schedule.setReleaseState(ReleaseState.ONLINE);
- schedules.add(schedule);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
- Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
+ Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
//delete success
- schedules.clear();
schedule.setReleaseState(ReleaseState.OFFLINE);
- schedules.add(schedule);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
- Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
+ Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index 37fae5d..be7369c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -58,9 +58,9 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/**
* query schedule list by process definition code
* @param processDefinitionCode processDefinitionCode
- * @return schedule list
+ * @return schedule
*/
- List<Schedule> queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
+ Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* query schedule list by process definition code
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
index 65874c9..be4ff6f 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
@@ -200,12 +200,12 @@ public class ScheduleMapperTest extends BaseDaoTest {
* test query by process definition id
*/
@Test
- public void queryByProcessDefinitionId() {
+ public void queryByProcessDefinitionCode() {
Schedule schedule = insertOne();
schedule.setProcessDefinitionCode(12345);
scheduleMapper.updateById(schedule);
- List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
- Assert.assertNotEquals(schedules.size(), 0);
+ Schedule schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
+ Assert.assertNotNull(schedules);
}
}
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 77c2344..531c55b 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -259,16 +259,16 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
long processDefinitionCode,
String schedule,
String workerGroup) {
- List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+ Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
// create or update schedule
int scheduleId;
- if (schedules.isEmpty()) {
+ if (scheduleObj == null) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
- scheduleId = schedules.get(0).getId();
+ scheduleId = scheduleObj.getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);