You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/30 01:59:28 UTC

[dolphinscheduler] branch dev updated: [fix][python] schedule unexpect offline each time update from api (#13301)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d42f576268 [fix][python] schedule unexpect offline each time update from api (#13301)
d42f576268 is described below

commit d42f576268545f429c22bd3f10e3b64ac0496629
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Fri Dec 30 09:59:16 2022 +0800

    [fix][python] schedule unexpect offline each time update from api (#13301)
    
    * Alway set workflow online before set schedule online
    * Avoid return map in interface setScheduleState
---
 .../api/controller/SchedulerController.java        | 10 ++--
 .../dolphinscheduler/api/python/PythonGateway.java |  2 +
 .../api/service/SchedulerService.java              |  9 ++-
 .../api/service/impl/SchedulerServiceImpl.java     | 44 +++++----------
 .../api/controller/SchedulerControllerTest.java    | 10 ++--
 .../api/service/SchedulerServiceTest.java          | 65 +++++++++++++---------
 6 files changed, 68 insertions(+), 72 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
index 7c555b23c1..b4e0e8853b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
@@ -193,8 +193,8 @@ public class SchedulerController extends BaseController {
     public Result publishScheduleOnline(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
                                         @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                                         @PathVariable("id") Integer id) {
-        Map<String, Object> result = schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
-        return returnDataList(result);
+        schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
+        return Result.success();
     }
 
     /**
@@ -215,10 +215,8 @@ public class SchedulerController extends BaseController {
     public Result offlineSchedule(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
                                   @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                                   @PathVariable("id") Integer id) {
-
-        Map<String, Object> result =
-                schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
-        return returnDataList(result);
+        schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
+        return Result.success();
     }
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 893e2bd83c..c1d5f56c78 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -348,6 +348,8 @@ public class PythonGateway {
             schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
                     warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
         }
+        // Always set workflow online to set schedule online
+        processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
         schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
     }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
index 400643f3e9..e8b60e5c74 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
@@ -128,12 +128,11 @@ public interface SchedulerService {
      * @param projectCode project code
      * @param id scheduler id
      * @param scheduleStatus schedule status
-     * @return publish result code
      */
-    Map<String, Object> setScheduleState(User loginUser,
-                                         long projectCode,
-                                         Integer id,
-                                         ReleaseState scheduleStatus);
+    void setScheduleState(User loginUser,
+                          long projectCode,
+                          Integer id,
+                          ReleaseState scheduleStatus);
 
     /**
      * query schedule
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index d12216c05a..f546bc4b64 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -426,57 +426,48 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
      */
     @Override
     @Transactional
-    public Map<String, Object> setScheduleState(User loginUser,
-                                                long projectCode,
-                                                Integer id,
-                                                ReleaseState scheduleStatus) {
-        Map<String, Object> result = new HashMap<>();
-
+    public void setScheduleState(User loginUser,
+                                 long projectCode,
+                                 Integer id,
+                                 ReleaseState scheduleStatus) {
         Project project = projectMapper.queryByCode(projectCode);
         // check project auth
-        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
-        if (!hasProjectAndPerm) {
-            return result;
-        }
+        projectService.checkProjectAndAuthThrowException(loginUser, project, null);
 
         // check schedule exists
         Schedule scheduleObj = scheduleMapper.selectById(id);
 
         if (scheduleObj == null) {
             logger.error("Schedule does not exist, scheduleId:{}.", id);
-            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
-            return result;
+            throw new ServiceException(Status.SCHEDULE_CRON_NOT_EXISTS, id);
         }
         // check schedule release state
         if (scheduleObj.getReleaseState() == scheduleStatus) {
             logger.warn("Schedule state does not need to change due to schedule state is already {}, scheduleId:{}.",
                     scheduleObj.getReleaseState().getDescp(), scheduleObj.getId());
-            putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
-            return result;
+            throw new ServiceException(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
         }
         ProcessDefinition processDefinition =
                 processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
             logger.error("Process definition does not exist, processDefinitionCode:{}.",
                     scheduleObj.getProcessDefinitionCode());
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
-            return result;
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
+                    String.valueOf(scheduleObj.getProcessDefinitionCode()));
         }
         List<ProcessTaskRelation> processTaskRelations =
                 processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
         if (processTaskRelations.isEmpty()) {
             logger.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
                     processDefinition.getCode());
-            putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
-            return result;
+            throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
         }
         if (scheduleStatus == ReleaseState.ONLINE) {
             // check process definition release state
             if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
                 logger.warn("Only process definition state is {} can change schedule state, processDefinitionCode:{}.",
                         ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
-                putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
-                return result;
+                throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
             }
             // check sub process definition release state
             List<Long> subProcessDefineCodes = new ArrayList<>();
@@ -496,9 +487,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
                             logger.warn(
                                     "Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
                                     ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode());
-                            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
+                            throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE,
                                     String.valueOf(subProcessDefinition.getId()));
-                            return result;
                         }
                     }
                 }
@@ -510,8 +500,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
 
         if (masterServers.isEmpty()) {
             logger.error("Master does not exist.");
-            putMsg(result, Status.MASTER_NOT_EXISTS);
-            return result;
+            throw new ServiceException(Status.MASTER_NOT_EXISTS);
         }
 
         // set status
@@ -532,20 +521,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
                     deleteSchedule(project.getId(), id);
                     break;
                 default:
-                    putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
-                    return result;
+                    throw new ServiceException(Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
             }
         } catch (Exception e) {
             logger.error("Set schedule state to {} error, projectCode:{}, scheduleId:{}.", scheduleStatus.getDescp(),
                     projectCode, scheduleObj.getId());
             Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
                     : Status.OFFLINE_SCHEDULE_ERROR;
-            result.put(Constants.STATUS, status);
             throw new ServiceException(status, e);
         }
-
-        putMsg(result, Status.SUCCESS);
-        return result;
     }
 
     /**
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
index e5d3950ed3..efef2b076b 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
@@ -120,8 +120,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
         paramsMap.add("id", "37");
 
-        Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
-                isA(ReleaseState.class))).thenReturn(success());
+        Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
+                isA(Integer.class),
+                isA(ReleaseState.class));
 
         MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online", 123, 37)
                 .header(SESSION_ID, sessionId)
@@ -140,8 +141,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
         paramsMap.add("id", "28");
 
-        Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
-                isA(ReleaseState.class))).thenReturn(success());
+        Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
+                isA(Integer.class),
+                isA(ReleaseState.class));
 
         MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline", 123, 28)
                 .header(SESSION_ID, sessionId)
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index 4de90a8a77..48ddefc6f6 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -26,10 +26,11 @@ import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.User;
@@ -41,7 +42,8 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -118,7 +120,6 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
 
     @Test
     public void testSetScheduleState() {
-        Map<String, Object> result;
         Project project = getProject();
 
         ProcessDefinition processDefinition = new ProcessDefinition();
@@ -130,40 +131,49 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
         schedule.setReleaseState(ReleaseState.OFFLINE);
 
         Mockito.when(scheduleMapper.selectById(1)).thenReturn(schedule);
-
-        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
-
         Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
+        Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
 
-        // hash no auth
-        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
-
-        Mockito.when(projectService.hasProjectAndPerm(user, project, result, null)).thenReturn(true);
         // schedule not exists
-        result = schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
-        Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS, result.get(Constants.STATUS));
-
-        // SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
-        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
-        Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
+        exception = Assertions.assertThrows(ServiceException.class, () -> {
+            schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
+        });
+        Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
+
+        // SCHEDULE_CRON_RELEASE_NEED_NOT_CHANGE
+        exception = Assertions.assertThrows(ServiceException.class, () -> {
+            schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
+        });
+        Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE.getCode(),
+                ((ServiceException) exception).getCode());
 
         // PROCESS_DEFINE_NOT_EXIST
         schedule.setProcessDefinitionCode(2);
-        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
-        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
+        exception = Assertions.assertThrows(ServiceException.class, () -> {
+            schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
+        });
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
         schedule.setProcessDefinitionCode(1);
 
-        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
-        Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
-
-        processDefinition.setReleaseState(ReleaseState.ONLINE);
-
-        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
-        Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
+        // online also success
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+        processTaskRelationList.add(processTaskRelation);
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1)).thenReturn(processTaskRelationList);
+        exception = Assertions.assertThrows(ServiceException.class, () -> {
+            schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
+        });
+        Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_RELEASE.getCode(), ((ServiceException) exception).getCode());
 
         // SUCCESS
-        result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
-        Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
+        Server server = new Server();
+        List<Server> serverList = new ArrayList<>();
+        serverList.add(server);
+        Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(serverList);
+        processDefinition.setReleaseState(ReleaseState.ONLINE);
+        Assertions.assertDoesNotThrow(() -> {
+            schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
+        });
     }
 
     @Test
@@ -428,6 +438,7 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
 
     private Project getProject() {
         Project project = new Project();
+        project.setId(1);
         project.setName(projectName);
         project.setCode(projectCode);
         project.setUserId(userId);