You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/10/26 11:18:42 UTC
[dolphinscheduler] branch dev updated: [Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ed209bdf82 [Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493)
ed209bdf82 is described below
commit ed209bdf82d855a804b68d8833f20595e86bb069
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Wed Oct 26 19:18:32 2022 +0800
[Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493)
* Migrate all workergroup-related interface functions from ProcessServiceImpl
---
.../api/service/WorkerGroupService.java | 29 +-
.../api/service/impl/ExecutorServiceImpl.java | 6 +-
.../api/service/impl/WorkerGroupServiceImpl.java | 39 +++
.../api/service/ExecutorServiceTest.java | 5 +-
.../api/service/WorkerGroupServiceTest.java | 313 ++++++++++++++++-----
.../service/process/ProcessService.java | 4 -
.../service/process/ProcessServiceImpl.java | 37 ---
7 files changed, 320 insertions(+), 113 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index b2f9d78178..2c87e4be2e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -18,8 +18,10 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
+import java.util.List;
import java.util.Map;
/**
@@ -28,7 +30,7 @@ import java.util.Map;
public interface WorkerGroupService {
/**
- * create or update a worker group
+ * Create or update a worker group
*
* @param loginUser login user
* @param id worker group id
@@ -42,7 +44,7 @@ public interface WorkerGroupService {
String otherParamsJson);
/**
- * query worker group paging
+ * Query worker group paging
*
* @param loginUser login user
* @param pageNo page number
@@ -53,25 +55,40 @@ public interface WorkerGroupService {
Result queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal);
/**
- * query all worker group
+ * Query all worker group
*
- * @param loginUser
+ * @param loginUser login user
* @return all worker group list
*/
Map<String, Object> queryAllGroup(User loginUser);
/**
- * delete worker group by id
+ * Delete worker group by id
+ * @param loginUser login user
* @param id worker group id
* @return delete result code
*/
Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id);
/**
- * query all worker address list
+ * Query all worker address list
*
* @return all worker address list
*/
Map<String, Object> getWorkerAddressList();
+ /**
+ * Get task instance's worker group
+ * @param taskInstance task instance
+ * @return worker group
+ */
+ String getTaskWorkerGroup(TaskInstance taskInstance);
+
+ /**
+ * Query worker group by process definition codes
+ * @param processDefinitionCodeList processDefinitionCodeList
+ * @return worker group map
+ */
+ Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);
+
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 6681d12038..bc885f1b82 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
@@ -150,6 +151,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private TaskGroupQueueMapper taskGroupQueueMapper;
+ @Autowired
+ private WorkerGroupService workerGroupService;
+
/**
* execute process instance
*
@@ -1030,7 +1034,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
.collect(Collectors.toList());
Map<Long, String> processDefinitionWorkerGroupMap =
- processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
+ workerGroupService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 66b1f5203c..860c410b1f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -29,10 +29,14 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections.CollectionUtils;
@@ -74,6 +78,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private RegistryClient registryClient;
+ @Autowired
+ private ProcessService processService;
+
+ @Autowired
+ private ScheduleMapper scheduleMapper;
+
/**
* create or update a worker group
*
@@ -354,4 +364,33 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
return result;
}
+ @Override
+ public String getTaskWorkerGroup(TaskInstance taskInstance) {
+ if (taskInstance == null) {
+ return null;
+ }
+
+ String workerGroup = taskInstance.getWorkerGroup();
+
+ if (StringUtils.isNotEmpty(workerGroup)) {
+ return workerGroup;
+ }
+ int processInstanceId = taskInstance.getProcessInstanceId();
+ ProcessInstance processInstance = processService.findProcessInstanceById(processInstanceId);
+
+ if (processInstance != null) {
+ return processInstance.getWorkerGroup();
+ }
+ logger.info("task : {} will use default worker group", taskInstance.getId());
+ return Constants.DEFAULT_WORKER_GROUP;
+ }
+
+ @Override
+ public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
+ List<Schedule> processDefinitionScheduleList =
+ scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
+ return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
+ Schedule::getWorkerGroup));
+ }
+
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index dd37a7cdd9..7b01b488c8 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -106,6 +106,9 @@ public class ExecutorServiceTest {
@Mock
private CommandService commandService;
+ @Mock
+ private WorkerGroupService workerGroupService;
+
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@@ -289,7 +292,7 @@ public class ExecutorServiceTest {
Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
- Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
+ Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
.thenReturn(processDefinitionWorkerGroupMap);
Command command = new Command();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 168f4e78e2..987c718997 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -17,129 +17,314 @@
package org.apache.dolphinscheduler.api.service;
-import org.apache.dolphinscheduler.api.ApiApplicationServer;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_CREATE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;
+import static org.mockito.ArgumentMatchers.any;
+
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
+import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
+import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.enums.ProfileType;
+import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
import org.mockito.Mockito;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.test.context.ActiveProfiles;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-@ActiveProfiles(value = {ProfileType.H2})
-@SpringBootTest(classes = ApiApplicationServer.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
public class WorkerGroupServiceTest {
- @MockBean(name = "registryClient")
- private RegistryClient registryClient;
+ private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class);
+
+ private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class);
- @Autowired
+ private static final Logger serviceLogger = LoggerFactory.getLogger(WorkerGroupService.class);
+
+ @InjectMocks
private WorkerGroupServiceImpl workerGroupService;
- @MockBean(name = "workerGroupMapper")
+ @Mock
private WorkerGroupMapper workerGroupMapper;
- @MockBean(name = "processInstanceMapper")
+ @Mock
private ProcessInstanceMapper processInstanceMapper;
- private String groupName = "groupName000001";
+ @Mock
+ private ProcessService processService;
+
+ @Mock
+ private RegistryClient registryClient;
+
+ @Mock
+ private ResourcePermissionCheckService resourcePermissionCheckService;
+
+ private final String GROUP_NAME = "testWorkerGroup";
+
+ private User getLoginUser() {
+ User loginUser = new User();
+ loginUser.setUserType(UserType.GENERAL_USER);
+ loginUser.setUserName("workerGroupTestUser");
+ loginUser.setId(1);
+ return loginUser;
+ }
+
+ @Test
+ public void giveNoPermission_whenSaveWorkerGroup_expectNoOperation() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(false);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(false);
+ Map<String, Object> result =
+ workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+ Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
+ ((Status) result.get(Constants.STATUS)).getCode());
+ }
+
+ @Test
+ public void giveNullName_whenSaveWorkerGroup_expectNAME_NULL() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(true);
+ Map<String, Object> result =
+ workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group", "");
+ Assertions.assertEquals(Status.NAME_NULL.getCode(),
+ ((Status) result.get(Constants.STATUS)).getCode());
+ }
- private User loginUSer;
+ @Test
+ public void giveSameUserName_whenSaveWorkerGroup_expectNAME_EXIST() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(true);
+ Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+ List<WorkerGroup> workerGroupList = new ArrayList<WorkerGroup>();
+ workerGroupList.add(getWorkerGroup(1));
+ Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(workerGroupList);
+
+ Map<String, Object> result =
+ workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+ Assertions.assertEquals(Status.NAME_EXIST.getCode(),
+ ((Status) result.get(Constants.STATUS)).getCode());
+ }
+
+ @Test
+ public void giveInvalidAddress_whenSaveWorkerGroup_expectADDRESS_INVALID() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(true);
+ Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+ Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+ String workerGroupPath =
+ Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME;
+ Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false);
+ Map<String, String> serverMaps = new HashMap<>();
+ serverMaps.put("localhost1:0000", "");
+ Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
+
+ Map<String, Object> result =
+ workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+ Assertions.assertEquals(Status.WORKER_ADDRESS_INVALID.getCode(),
+ ((Status) result.get(Constants.STATUS)).getCode());
+ }
+
+ @Test
+ public void giveValidWorkerGroup_whenSaveWorkerGroup_expectSuccess() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(true);
+
+ Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+ Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+ String workerGroupPath =
+ Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME;
+ Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false);
+ Map<String, String> serverMaps = new HashMap<>();
+ serverMaps.put("localhost:0000", "");
+ Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
+ Mockito.when(workerGroupMapper.insert(any())).thenReturn(1);
+
+ Map<String, Object> result =
+ workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+ Assertions.assertEquals(Status.SUCCESS.getCode(),
+ ((Status) result.get(Constants.STATUS)).getCode());
+ }
+
+ @Test
+ public void giveValidParams_whenQueryAllGroupPaging_expectSuccess() {
+ User loginUser = getLoginUser();
+ Set<Integer> ids = new HashSet<>();
+ ids.add(1);
+ List<WorkerGroup> workerGroups = new ArrayList<>();
+ workerGroups.add(getWorkerGroup(1));
+ Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
+ loginUser.getId(), serviceLogger)).thenReturn(ids);
+ Mockito.when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
+ Set<String> activeWorkerNodes = new HashSet<>();
+ activeWorkerNodes.add("localhost:12345");
+ activeWorkerNodes.add("localhost:23456");
+ Mockito.when(registryClient.getServerNodeSet(NodeType.WORKER)).thenReturn(activeWorkerNodes);
- @BeforeEach
- public void init() {
- loginUSer = new User();
- loginUSer.setUserType(UserType.ADMIN_USER);
+ Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null);
+ Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode());
}
@Test
public void testQueryAllGroup() {
- Map<String, Object> result = workerGroupService.queryAllGroup(loginUSer);
+ Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
Assertions.assertEquals(workerGroups.size(), 1);
}
- /**
- * delete group by id
- */
@Test
- public void testDeleteWorkerGroupById() {
- User user = new User();
- user.setId(1);
- user.setUserType(UserType.ADMIN_USER);
- WorkerGroup wg2 = getWorkerGroup(2);
- Mockito.when(workerGroupMapper.selectById(2)).thenReturn(wg2);
- Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg2.getName(),
- org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
- .thenReturn(getProcessInstanceList());
- Map<String, Object> result = workerGroupService.deleteWorkerGroupById(user, 1);
+ public void giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(true);
+ Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+
+ Map<String, Object> notExistResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(),
- ((Status) result.get(Constants.STATUS)).getCode());
- result = workerGroupService.deleteWorkerGroupById(user, 2);
- Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
- ((Status) result.get(Constants.STATUS)).getCode());
- // correct
- WorkerGroup wg3 = getWorkerGroup(3);
- Mockito.when(workerGroupMapper.selectById(3)).thenReturn(wg3);
- Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg3.getName(),
+ ((Status) notExistResult.get(Constants.STATUS)).getCode());
+ }
+
+ @Test
+ public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(true);
+ WorkerGroup workerGroup = getWorkerGroup(1);
+ Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ List<ProcessInstance> processInstances = new ArrayList<ProcessInstance>();
+ processInstances.add(processInstance);
+ Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
- .thenReturn(new ArrayList<>());
- result = workerGroupService.deleteWorkerGroupById(user, 3);
- Assertions.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
+ .thenReturn(processInstances);
+
+ Map<String, Object> deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1);
+ Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
+ ((Status) deleteFailed.get(Constants.STATUS)).getCode());
}
- /**
- * get processInstances
- */
- private List<ProcessInstance> getProcessInstanceList() {
- List<ProcessInstance> processInstances = new ArrayList<>();
- processInstances.add(new ProcessInstance());
- return processInstances;
+ @Test
+ public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() {
+ User loginUser = getLoginUser();
+ Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
+ Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+ baseServiceLogger)).thenReturn(true);
+ WorkerGroup workerGroup = getWorkerGroup(1);
+ Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+ Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
+ org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null);
+ Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
+ Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""))
+ .thenReturn(1);
+
+ Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
+ Assertions.assertEquals(Status.SUCCESS.getCode(),
+ ((Status) successResult.get(Constants.STATUS)).getCode());
}
@Test
public void testQueryAllGroupWithDefault() {
- Map<String, Object> result = workerGroupService.queryAllGroup(loginUSer);
+ Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
Assertions.assertEquals(1, workerGroups.size());
Assertions.assertEquals("default", workerGroups.toArray()[0]);
}
+ @Test
+ public void giveNull_whenGetTaskWorkerGroup_expectNull() {
+ String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null);
+ Assertions.assertNull(nullWorkerGroup);
+ }
+
+ @Test
+ public void giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setWorkerGroup("cluster1");
+
+ String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
+ Assertions.assertEquals("cluster1", workerGroup);
+ }
+
+ @Test
+ public void giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setProcessInstanceId(1);
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setWorkerGroup("cluster1");
+ Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance);
+
+ String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
+ Assertions.assertEquals("cluster1", workerGroup);
+ }
+
+ @Test
+ public void giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setProcessInstanceId(1);
+ Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null);
+
+ String defaultWorkerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
+ Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP, defaultWorkerGroup);
+ }
+
/**
* get Group
*/
private WorkerGroup getWorkerGroup(int id) {
WorkerGroup workerGroup = new WorkerGroup();
- workerGroup.setName(groupName);
+ workerGroup.setName(GROUP_NAME);
workerGroup.setId(id);
return workerGroup;
}
- private WorkerGroup getWorkerGroup() {
- return getWorkerGroup(1);
- }
-
- private List<WorkerGroup> getList() {
- List<WorkerGroup> list = new ArrayList<>();
- list.add(getWorkerGroup());
- return list;
- }
-
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 76a6e6ea52..74989430e4 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -119,8 +119,6 @@ public interface ProcessService {
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
- Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);
-
List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode);
List<ProcessInstance> queryNeedFailoverProcessInstances(String host);
@@ -150,8 +148,6 @@ public interface ProcessService {
ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);
- String getTaskWorkerGroup(TaskInstance taskInstance);
-
List<Project> getProjectListHavePerm(int userId);
<T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 5c6a149657..70d4c52504 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1570,20 +1570,6 @@ public class ProcessServiceImpl implements ProcessService {
return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
}
- /**
- * query Schedule by processDefinitionCode
- *
- * @param processDefinitionCodeList processDefinitionCodeList
- * @see Schedule
- */
- @Override
- public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
- List<Schedule> processDefinitionScheduleList =
- scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
- return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
- Schedule::getWorkerGroup));
- }
-
/**
* query dependent process definition by process definition code
*
@@ -1797,29 +1783,6 @@ public class ProcessServiceImpl implements ProcessService {
return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId);
}
- /**
- * get task worker group
- *
- * @param taskInstance taskInstance
- * @return workerGroupId
- */
- @Override
- public String getTaskWorkerGroup(TaskInstance taskInstance) {
- String workerGroup = taskInstance.getWorkerGroup();
-
- if (!Strings.isNullOrEmpty(workerGroup)) {
- return workerGroup;
- }
- int processInstanceId = taskInstance.getProcessInstanceId();
- ProcessInstance processInstance = findProcessInstanceById(processInstanceId);
-
- if (processInstance != null) {
- return processInstance.getWorkerGroup();
- }
- logger.info("task : {} will use default worker group", taskInstance.getId());
- return Constants.DEFAULT_WORKER_GROUP;
- }
-
/**
* get have perm project list
*