You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/11/25 02:22:49 UTC

[dolphinscheduler] branch dev updated: update schdule api (#6977)

This is an automated email from the ASF dual-hosted git repository.

wenhemin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5a04b8d  update schdule api (#6977)
5a04b8d is described below

commit 5a04b8d49aa8e30a60a3fdc0cdb3b2f6910aa084
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Thu Nov 25 10:22:41 2021 +0800

    update schdule api (#6977)
---
 .../apache/dolphinscheduler/api/enums/Status.java  |   2 +-
 .../service/impl/ProcessDefinitionServiceImpl.java |  27 ++--
 .../api/service/impl/SchedulerServiceImpl.java     | 146 +++++++++++++--------
 .../api/service/ProcessDefinitionServiceTest.java  |  18 +--
 .../dao/mapper/ScheduleMapper.java                 |   4 +-
 .../dao/mapper/ScheduleMapperTest.java             |   6 +-
 .../server/PythonGatewayServer.java                |   6 +-
 7 files changed, 120 insertions(+), 89 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 506e6a6..4fbec8e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -269,7 +269,7 @@ public enum Status {
     CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"),
     PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
     PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
-    PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"),
+    PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"),
     CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
     CREATE_TASK_DEFINITION_ERROR(50037, "create task definition error", "创建任务错误"),
     UPDATE_TASK_DEFINITION_ERROR(50038, "update task definition error", "更新任务定义错误"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 17ca1dd..1276717 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -664,21 +664,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
 
         // get the timing according to the process definition
-        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(code);
-        if (!schedules.isEmpty() && schedules.size() > 1) {
-            logger.warn("scheduler num is {},Greater than 1", schedules.size());
-            putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
-            return result;
-        } else if (schedules.size() == 1) {
-            Schedule schedule = schedules.get(0);
-            if (schedule.getReleaseState() == ReleaseState.OFFLINE) {
-                int delete = scheduleMapper.deleteById(schedule.getId());
+        Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
+        if (scheduleObj != null) {
+            if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
+                int delete = scheduleMapper.deleteById(scheduleObj.getId());
                 if (delete == 0) {
                     putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
                     throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
                 }
-            } else if (schedule.getReleaseState() == ReleaseState.ONLINE) {
-                putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
+            }
+            if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
+                putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
                 return result;
             }
         }
@@ -815,12 +811,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return DagDataSchedule
      */
     public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) {
-        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
+        Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
         DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition));
-        if (!schedules.isEmpty()) {
-            Schedule schedule = schedules.get(0);
-            schedule.setReleaseState(ReleaseState.OFFLINE);
-            dagDataSchedule.setSchedule(schedule);
+        if (scheduleObj != null) {
+            scheduleObj.setReleaseState(ReleaseState.OFFLINE);
+            dagDataSchedule.setSchedule(scheduleObj);
         }
         return dagDataSchedule;
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 0a920aa..7658efe 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -36,10 +36,12 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -101,6 +103,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
     @Autowired
     private Scheduler scheduler;
 
+    @Autowired
+    private ProcessTaskRelationMapper processTaskRelationMapper;
+
     /**
      * save schedule
      *
@@ -247,57 +252,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
             return result;
         }
 
-        /**
-         * scheduling on-line status forbid modification
-         */
-        if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
-            return result;
-        }
-
-        Date now = new Date();
-
-        // updateProcessInstance param
-        if (!StringUtils.isEmpty(scheduleExpression)) {
-            ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
-            if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
-                logger.warn("The start time must not be the same as the end");
-                putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
-                return result;
-            }
-            schedule.setStartTime(scheduleParam.getStartTime());
-            schedule.setEndTime(scheduleParam.getEndTime());
-            if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
-                putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
-                return result;
-            }
-            schedule.setCrontab(scheduleParam.getCrontab());
-            schedule.setTimezoneId(scheduleParam.getTimezoneId());
-        }
-
-        if (warningType != null) {
-            schedule.setWarningType(warningType);
-        }
-
-        schedule.setWarningGroupId(warningGroupId);
-
-        if (failureStrategy != null) {
-            schedule.setFailureStrategy(failureStrategy);
-        }
-
-        schedule.setWorkerGroup(workerGroup);
-        schedule.setEnvironmentCode(environmentCode);
-        schedule.setUpdateTime(now);
-        schedule.setProcessInstancePriority(processInstancePriority);
-        scheduleMapper.updateById(schedule);
-
-        /**
-         * updateProcessInstance recipients and cc by process definition ID
-         */
-        processDefinition.setWarningGroupId(warningGroupId);
-
-        processDefinitionMapper.updateById(processDefinition);
-
-        putMsg(result, Status.SUCCESS);
+        updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
         return result;
     }
 
@@ -345,7 +300,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
             return result;
         }
-
+        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
+        if (processTaskRelations.isEmpty()) {
+            putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
+            return result;
+        }
         if (scheduleStatus == ReleaseState.ONLINE) {
             // check process definition release state
             if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
@@ -633,6 +592,87 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
                                                                      Priority processInstancePriority,
                                                                      String workerGroup,
                                                                      long environmentCode) {
-        return null;
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        // check schedule exists
+        Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        if (schedule == null) {
+            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, processDefinitionCode);
+            return result;
+        }
+
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
+            return result;
+        }
+
+        updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
+        return result;
+    }
+
+    private void updateSchedule(Map<String, Object> result,
+                                Schedule schedule,
+                                ProcessDefinition processDefinition,
+                                String scheduleExpression,
+                                WarningType warningType,
+                                int warningGroupId,
+                                FailureStrategy failureStrategy,
+                                Priority processInstancePriority,
+                                String workerGroup,
+                                long environmentCode) {
+        if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
+            return;
+        }
+
+        Date now = new Date();
+
+        // updateProcessInstance param
+        if (!StringUtils.isEmpty(scheduleExpression)) {
+            ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
+            if (scheduleParam == null) {
+                putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
+                return;
+            }
+            if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
+                logger.warn("The start time must not be the same as the end");
+                putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
+                return;
+            }
+            schedule.setStartTime(scheduleParam.getStartTime());
+            schedule.setEndTime(scheduleParam.getEndTime());
+            if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+                putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
+                return;
+            }
+            schedule.setCrontab(scheduleParam.getCrontab());
+            schedule.setTimezoneId(scheduleParam.getTimezoneId());
+        }
+
+        if (warningType != null) {
+            schedule.setWarningType(warningType);
+        }
+
+        schedule.setWarningGroupId(warningGroupId);
+
+        if (failureStrategy != null) {
+            schedule.setFailureStrategy(failureStrategy);
+        }
+
+        schedule.setWorkerGroup(workerGroup);
+        schedule.setEnvironmentCode(environmentCode);
+        schedule.setUpdateTime(now);
+        schedule.setProcessInstancePriority(processInstancePriority);
+        scheduleMapper.updateById(schedule);
+
+        processDefinition.setWarningGroupId(warningGroupId);
+
+        processDefinitionMapper.updateById(processDefinition);
+
+        putMsg(result, Status.SUCCESS);
     }
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 7fe02eb..964555b 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -378,32 +378,28 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
-        List<Schedule> schedules = new ArrayList<>();
-        schedules.add(getSchedule());
-        schedules.add(getSchedule());
-        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
+        Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
+        Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
+        Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
         Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
-        Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS));
+        Assert.assertEquals(Status.SUCCESS, schedulerGreaterThanOneRes.get(Constants.STATUS));
 
         //scheduler online
-        schedules.clear();
         Schedule schedule = getSchedule();
         schedule.setReleaseState(ReleaseState.ONLINE);
-        schedules.add(schedule);
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
-        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
         Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
         Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
 
         //delete success
-        schedules.clear();
         schedule.setReleaseState(ReleaseState.OFFLINE);
-        schedules.add(schedule);
         Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
         Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
         Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
-        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
+        Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
         Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index 37fae5d..be7369c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -58,9 +58,9 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
     /**
      * query schedule list by process definition code
      * @param processDefinitionCode processDefinitionCode
-     * @return schedule list
+     * @return schedule
      */
-    List<Schedule> queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
+    Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
 
     /**
      * query schedule list by process definition code
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
index 65874c9..be4ff6f 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
@@ -200,12 +200,12 @@ public class ScheduleMapperTest extends BaseDaoTest {
      * test query by process definition id
      */
     @Test
-    public void queryByProcessDefinitionId() {
+    public void queryByProcessDefinitionCode() {
         Schedule schedule = insertOne();
         schedule.setProcessDefinitionCode(12345);
         scheduleMapper.updateById(schedule);
 
-        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
-        Assert.assertNotEquals(schedules.size(), 0);
+        Schedule schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
+        Assert.assertNotNull(schedules);
     }
 }
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 77c2344..531c55b 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -259,16 +259,16 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
                                         long processDefinitionCode,
                                         String schedule,
                                         String workerGroup) {
-        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
         // create or update schedule
         int scheduleId;
-        if (schedules.isEmpty()) {
+        if (scheduleObj == null) {
             processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
             Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
                 DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
             scheduleId = (int) result.get("scheduleId");
         } else {
-            scheduleId = schedules.get(0).getId();
+            scheduleId = scheduleObj.getId();
             processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
             schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
                 DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);