You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/30 01:59:28 UTC
[dolphinscheduler] branch dev updated: [fix][python] schedule unexpect offline each time update from api (#13301)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d42f576268 [fix][python] schedule unexpect offline each time update from api (#13301)
d42f576268 is described below
commit d42f576268545f429c22bd3f10e3b64ac0496629
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Fri Dec 30 09:59:16 2022 +0800
[fix][python] schedule unexpect offline each time update from api (#13301)
* Alway set workflow online before set schedule online
* Avoid return map in interface setScheduleState
---
.../api/controller/SchedulerController.java | 10 ++--
.../dolphinscheduler/api/python/PythonGateway.java | 2 +
.../api/service/SchedulerService.java | 9 ++-
.../api/service/impl/SchedulerServiceImpl.java | 44 +++++----------
.../api/controller/SchedulerControllerTest.java | 10 ++--
.../api/service/SchedulerServiceTest.java | 65 +++++++++++++---------
6 files changed, 68 insertions(+), 72 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
index 7c555b23c1..b4e0e8853b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
@@ -193,8 +193,8 @@ public class SchedulerController extends BaseController {
public Result publishScheduleOnline(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
- Map<String, Object> result = schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
- return returnDataList(result);
+ schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
+ return Result.success();
}
/**
@@ -215,10 +215,8 @@ public class SchedulerController extends BaseController {
public Result offlineSchedule(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
-
- Map<String, Object> result =
- schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
- return returnDataList(result);
+ schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
+ return Result.success();
}
/**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 893e2bd83c..c1d5f56c78 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -348,6 +348,8 @@ public class PythonGateway {
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
+ // Always set workflow online to set schedule online
+ processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
index 400643f3e9..e8b60e5c74 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
@@ -128,12 +128,11 @@ public interface SchedulerService {
* @param projectCode project code
* @param id scheduler id
* @param scheduleStatus schedule status
- * @return publish result code
*/
- Map<String, Object> setScheduleState(User loginUser,
- long projectCode,
- Integer id,
- ReleaseState scheduleStatus);
+ void setScheduleState(User loginUser,
+ long projectCode,
+ Integer id,
+ ReleaseState scheduleStatus);
/**
* query schedule
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index d12216c05a..f546bc4b64 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -426,57 +426,48 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
*/
@Override
@Transactional
- public Map<String, Object> setScheduleState(User loginUser,
- long projectCode,
- Integer id,
- ReleaseState scheduleStatus) {
- Map<String, Object> result = new HashMap<>();
-
+ public void setScheduleState(User loginUser,
+ long projectCode,
+ Integer id,
+ ReleaseState scheduleStatus) {
Project project = projectMapper.queryByCode(projectCode);
// check project auth
- boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
- if (!hasProjectAndPerm) {
- return result;
- }
+ projectService.checkProjectAndAuthThrowException(loginUser, project, null);
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
logger.error("Schedule does not exist, scheduleId:{}.", id);
- putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
- return result;
+ throw new ServiceException(Status.SCHEDULE_CRON_NOT_EXISTS, id);
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.warn("Schedule state does not need to change due to schedule state is already {}, scheduleId:{}.",
scheduleObj.getReleaseState().getDescp(), scheduleObj.getId());
- putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
- return result;
+ throw new ServiceException(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
logger.error("Process definition does not exist, processDefinitionCode:{}.",
scheduleObj.getProcessDefinitionCode());
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
- return result;
+ throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
+ String.valueOf(scheduleObj.getProcessDefinitionCode()));
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
logger.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
- putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
- return result;
+ throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.warn("Only process definition state is {} can change schedule state, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
- return result;
+ throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
}
// check sub process definition release state
List<Long> subProcessDefineCodes = new ArrayList<>();
@@ -496,9 +487,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
logger.warn(
"Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode());
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
+ throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
- return result;
}
}
}
@@ -510,8 +500,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
if (masterServers.isEmpty()) {
logger.error("Master does not exist.");
- putMsg(result, Status.MASTER_NOT_EXISTS);
- return result;
+ throw new ServiceException(Status.MASTER_NOT_EXISTS);
}
// set status
@@ -532,20 +521,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
deleteSchedule(project.getId(), id);
break;
default:
- putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
- return result;
+ throw new ServiceException(Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
}
} catch (Exception e) {
logger.error("Set schedule state to {} error, projectCode:{}, scheduleId:{}.", scheduleStatus.getDescp(),
projectCode, scheduleObj.getId());
Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
: Status.OFFLINE_SCHEDULE_ERROR;
- result.put(Constants.STATUS, status);
throw new ServiceException(status, e);
}
-
- putMsg(result, Status.SUCCESS);
- return result;
}
/**
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
index e5d3950ed3..efef2b076b 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
@@ -120,8 +120,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "37");
- Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
- isA(ReleaseState.class))).thenReturn(success());
+ Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
+ isA(Integer.class),
+ isA(ReleaseState.class));
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online", 123, 37)
.header(SESSION_ID, sessionId)
@@ -140,8 +141,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "28");
- Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
- isA(ReleaseState.class))).thenReturn(success());
+ Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
+ isA(Integer.class),
+ isA(ReleaseState.class));
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline", 123, 28)
.header(SESSION_ID, sessionId)
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index 4de90a8a77..48ddefc6f6 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -26,10 +26,11 @@ import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -41,7 +42,8 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -118,7 +120,6 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
@Test
public void testSetScheduleState() {
- Map<String, Object> result;
Project project = getProject();
ProcessDefinition processDefinition = new ProcessDefinition();
@@ -130,40 +131,49 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(scheduleMapper.selectById(1)).thenReturn(schedule);
-
- Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
-
Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
+ Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
- // hash no auth
- result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
-
- Mockito.when(projectService.hasProjectAndPerm(user, project, result, null)).thenReturn(true);
// schedule not exists
- result = schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
- Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS, result.get(Constants.STATUS));
-
- // SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
- result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
- Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
+ exception = Assertions.assertThrows(ServiceException.class, () -> {
+ schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
+ });
+ Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
+
+ // SCHEDULE_CRON_RELEASE_NEED_NOT_CHANGE
+ exception = Assertions.assertThrows(ServiceException.class, () -> {
+ schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
+ });
+ Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE.getCode(),
+ ((ServiceException) exception).getCode());
// PROCESS_DEFINE_NOT_EXIST
schedule.setProcessDefinitionCode(2);
- result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
- Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
+ exception = Assertions.assertThrows(ServiceException.class, () -> {
+ schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
+ });
+ Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
schedule.setProcessDefinitionCode(1);
- result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
- Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
-
- processDefinition.setReleaseState(ReleaseState.ONLINE);
-
- result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
- Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
+ // online also success
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+ processTaskRelationList.add(processTaskRelation);
+ Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1)).thenReturn(processTaskRelationList);
+ exception = Assertions.assertThrows(ServiceException.class, () -> {
+ schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
+ });
+ Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_RELEASE.getCode(), ((ServiceException) exception).getCode());
// SUCCESS
- result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
- Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
+ Server server = new Server();
+ List<Server> serverList = new ArrayList<>();
+ serverList.add(server);
+ Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(serverList);
+ processDefinition.setReleaseState(ReleaseState.ONLINE);
+ Assertions.assertDoesNotThrow(() -> {
+ schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
+ });
}
@Test
@@ -428,6 +438,7 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
private Project getProject() {
Project project = new Project();
+ project.setId(1);
project.setName(projectName);
project.setCode(projectCode);
project.setUserId(userId);