You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/10/26 11:18:42 UTC

[dolphinscheduler] branch dev updated: [Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ed209bdf82 [Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493)
ed209bdf82 is described below

commit ed209bdf82d855a804b68d8833f20595e86bb069
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Wed Oct 26 19:18:32 2022 +0800

    [Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493)
    
    * Migrate all workergroup-related interface functions from ProcessServiceImpl
---
 .../api/service/WorkerGroupService.java            |  29 +-
 .../api/service/impl/ExecutorServiceImpl.java      |   6 +-
 .../api/service/impl/WorkerGroupServiceImpl.java   |  39 +++
 .../api/service/ExecutorServiceTest.java           |   5 +-
 .../api/service/WorkerGroupServiceTest.java        | 313 ++++++++++++++++-----
 .../service/process/ProcessService.java            |   4 -
 .../service/process/ProcessServiceImpl.java        |  37 ---
 7 files changed, 320 insertions(+), 113 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index b2f9d78178..2c87e4be2e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -18,8 +18,10 @@
 package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -28,7 +30,7 @@ import java.util.Map;
 public interface WorkerGroupService {
 
     /**
-     * create or update a worker group
+     * Create or update a worker group
      *
      * @param loginUser login user
      * @param id worker group id
@@ -42,7 +44,7 @@ public interface WorkerGroupService {
                                         String otherParamsJson);
 
     /**
-     * query worker group paging
+     * Query worker group paging
      *
      * @param loginUser login user
      * @param pageNo page number
@@ -53,25 +55,40 @@ public interface WorkerGroupService {
     Result queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal);
 
     /**
-     * query all worker group
+     * Query all worker group
      *
-     * @param loginUser
+     * @param loginUser login user
      * @return all worker group list
      */
     Map<String, Object> queryAllGroup(User loginUser);
 
     /**
-     * delete worker group by id
+     * Delete worker group by id
+     * @param loginUser login user
      * @param id worker group id
      * @return delete result code
      */
     Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id);
 
     /**
-     * query all worker address list
+     * Query all worker address list
      *
      * @return all worker address list
      */
     Map<String, Object> getWorkerAddressList();
 
+    /**
+     * Get task instance's worker group
+     * @param taskInstance task instance
+     * @return worker group
+     */
+    String getTaskWorkerGroup(TaskInstance taskInstance);
+
+    /**
+     * Query worker group by process definition codes
+     * @param processDefinitionCodeList processDefinitionCodeList
+     * @return worker group map
+     */
+    Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);
+
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 6681d12038..bc885f1b82 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.MonitorService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.WorkerGroupService;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
@@ -150,6 +151,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     @Autowired
     private TaskGroupQueueMapper taskGroupQueueMapper;
 
+    @Autowired
+    private WorkerGroupService workerGroupService;
+
     /**
      * execute process instance
      *
@@ -1030,7 +1034,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                         .collect(Collectors.toList());
 
         Map<Long, String> processDefinitionWorkerGroupMap =
-                processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
+                workerGroupService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
 
         for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
             if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 66b1f5203c..860c410b1f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -29,10 +29,14 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -74,6 +78,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
     @Autowired
     private RegistryClient registryClient;
 
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private ScheduleMapper scheduleMapper;
+
     /**
      * create or update a worker group
      *
@@ -354,4 +364,33 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         return result;
     }
 
+    @Override
+    public String getTaskWorkerGroup(TaskInstance taskInstance) {
+        if (taskInstance == null) {
+            return null;
+        }
+
+        String workerGroup = taskInstance.getWorkerGroup();
+
+        if (StringUtils.isNotEmpty(workerGroup)) {
+            return workerGroup;
+        }
+        int processInstanceId = taskInstance.getProcessInstanceId();
+        ProcessInstance processInstance = processService.findProcessInstanceById(processInstanceId);
+
+        if (processInstance != null) {
+            return processInstance.getWorkerGroup();
+        }
+        logger.info("task : {} will use default worker group", taskInstance.getId());
+        return Constants.DEFAULT_WORKER_GROUP;
+    }
+
+    @Override
+    public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
+        List<Schedule> processDefinitionScheduleList =
+                scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
+        return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
+                Schedule::getWorkerGroup));
+    }
+
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index dd37a7cdd9..7b01b488c8 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -106,6 +106,9 @@ public class ExecutorServiceTest {
     @Mock
     private CommandService commandService;
 
+    @Mock
+    private WorkerGroupService workerGroupService;
+
     @Mock
     private ProcessDefinitionMapper processDefinitionMapper;
 
@@ -289,7 +292,7 @@ public class ExecutorServiceTest {
 
         Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
         processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
-        Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
+        Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
                 .thenReturn(processDefinitionWorkerGroupMap);
 
         Command command = new Command();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 168f4e78e2..987c718997 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -17,129 +17,314 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import org.apache.dolphinscheduler.api.ApiApplicationServer;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_CREATE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;
+import static org.mockito.ArgumentMatchers.any;
+
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
+import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
+import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.enums.ProfileType;
+import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.test.context.ActiveProfiles;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-@ActiveProfiles(value = {ProfileType.H2})
-@SpringBootTest(classes = ApiApplicationServer.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
 public class WorkerGroupServiceTest {
 
-    @MockBean(name = "registryClient")
-    private RegistryClient registryClient;
+    private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class);
+
+    private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class);
 
-    @Autowired
+    private static final Logger serviceLogger = LoggerFactory.getLogger(WorkerGroupService.class);
+
+    @InjectMocks
     private WorkerGroupServiceImpl workerGroupService;
 
-    @MockBean(name = "workerGroupMapper")
+    @Mock
     private WorkerGroupMapper workerGroupMapper;
 
-    @MockBean(name = "processInstanceMapper")
+    @Mock
     private ProcessInstanceMapper processInstanceMapper;
 
-    private String groupName = "groupName000001";
+    @Mock
+    private ProcessService processService;
+
+    @Mock
+    private RegistryClient registryClient;
+
+    @Mock
+    private ResourcePermissionCheckService resourcePermissionCheckService;
+
+    private final String GROUP_NAME = "testWorkerGroup";
+
+    private User getLoginUser() {
+        User loginUser = new User();
+        loginUser.setUserType(UserType.GENERAL_USER);
+        loginUser.setUserName("workerGroupTestUser");
+        loginUser.setId(1);
+        return loginUser;
+    }
+
+    @Test
+    public void giveNoPermission_whenSaveWorkerGroup_expectNoOperation() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(false);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(false);
+        Map<String, Object> result =
+                workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+        Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
+                ((Status) result.get(Constants.STATUS)).getCode());
+    }
+
+    @Test
+    public void giveNullName_whenSaveWorkerGroup_expectNAME_NULL() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(true);
+        Map<String, Object> result =
+                workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group", "");
+        Assertions.assertEquals(Status.NAME_NULL.getCode(),
+                ((Status) result.get(Constants.STATUS)).getCode());
+    }
 
-    private User loginUSer;
+    @Test
+    public void giveSameUserName_whenSaveWorkerGroup_expectNAME_EXIST() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(true);
+        Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+        List<WorkerGroup> workerGroupList = new ArrayList<WorkerGroup>();
+        workerGroupList.add(getWorkerGroup(1));
+        Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(workerGroupList);
+
+        Map<String, Object> result =
+                workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+        Assertions.assertEquals(Status.NAME_EXIST.getCode(),
+                ((Status) result.get(Constants.STATUS)).getCode());
+    }
+
+    @Test
+    public void giveInvalidAddress_whenSaveWorkerGroup_expectADDRESS_INVALID() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(true);
+        Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+        Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+        String workerGroupPath =
+                Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME;
+        Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false);
+        Map<String, String> serverMaps = new HashMap<>();
+        serverMaps.put("localhost1:0000", "");
+        Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
+
+        Map<String, Object> result =
+                workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+        Assertions.assertEquals(Status.WORKER_ADDRESS_INVALID.getCode(),
+                ((Status) result.get(Constants.STATUS)).getCode());
+    }
+
+    @Test
+    public void giveValidWorkerGroup_whenSaveWorkerGroup_expectSuccess() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(true);
+
+        Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+        Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+        String workerGroupPath =
+                Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME;
+        Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false);
+        Map<String, String> serverMaps = new HashMap<>();
+        serverMaps.put("localhost:0000", "");
+        Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
+        Mockito.when(workerGroupMapper.insert(any())).thenReturn(1);
+
+        Map<String, Object> result =
+                workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
+        Assertions.assertEquals(Status.SUCCESS.getCode(),
+                ((Status) result.get(Constants.STATUS)).getCode());
+    }
+
+    @Test
+    public void giveValidParams_whenQueryAllGroupPaging_expectSuccess() {
+        User loginUser = getLoginUser();
+        Set<Integer> ids = new HashSet<>();
+        ids.add(1);
+        List<WorkerGroup> workerGroups = new ArrayList<>();
+        workerGroups.add(getWorkerGroup(1));
+        Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
+                loginUser.getId(), serviceLogger)).thenReturn(ids);
+        Mockito.when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
+        Set<String> activeWorkerNodes = new HashSet<>();
+        activeWorkerNodes.add("localhost:12345");
+        activeWorkerNodes.add("localhost:23456");
+        Mockito.when(registryClient.getServerNodeSet(NodeType.WORKER)).thenReturn(activeWorkerNodes);
 
-    @BeforeEach
-    public void init() {
-        loginUSer = new User();
-        loginUSer.setUserType(UserType.ADMIN_USER);
+        Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null);
+        Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode());
     }
 
     @Test
     public void testQueryAllGroup() {
-        Map<String, Object> result = workerGroupService.queryAllGroup(loginUSer);
+        Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
         List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
         Assertions.assertEquals(workerGroups.size(), 1);
     }
 
-    /**
-     * delete group by id
-     */
     @Test
-    public void testDeleteWorkerGroupById() {
-        User user = new User();
-        user.setId(1);
-        user.setUserType(UserType.ADMIN_USER);
-        WorkerGroup wg2 = getWorkerGroup(2);
-        Mockito.when(workerGroupMapper.selectById(2)).thenReturn(wg2);
-        Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg2.getName(),
-                org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
-                .thenReturn(getProcessInstanceList());
-        Map<String, Object> result = workerGroupService.deleteWorkerGroupById(user, 1);
+    public void giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(true);
+        Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+
+        Map<String, Object> notExistResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
         Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(),
-                ((Status) result.get(Constants.STATUS)).getCode());
-        result = workerGroupService.deleteWorkerGroupById(user, 2);
-        Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
-                ((Status) result.get(Constants.STATUS)).getCode());
-        // correct
-        WorkerGroup wg3 = getWorkerGroup(3);
-        Mockito.when(workerGroupMapper.selectById(3)).thenReturn(wg3);
-        Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg3.getName(),
+                ((Status) notExistResult.get(Constants.STATUS)).getCode());
+    }
+
+    @Test
+    public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(true);
+        WorkerGroup workerGroup = getWorkerGroup(1);
+        Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        List<ProcessInstance> processInstances = new ArrayList<ProcessInstance>();
+        processInstances.add(processInstance);
+        Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
                 org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
-                .thenReturn(new ArrayList<>());
-        result = workerGroupService.deleteWorkerGroupById(user, 3);
-        Assertions.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
+                .thenReturn(processInstances);
+
+        Map<String, Object> deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1);
+        Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
+                ((Status) deleteFailed.get(Constants.STATUS)).getCode());
     }
 
-    /**
-     * get processInstances
-     */
-    private List<ProcessInstance> getProcessInstanceList() {
-        List<ProcessInstance> processInstances = new ArrayList<>();
-        processInstances.add(new ProcessInstance());
-        return processInstances;
+    @Test
+    public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() {
+        User loginUser = getLoginUser();
+        Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
+        Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
+                baseServiceLogger)).thenReturn(true);
+        WorkerGroup workerGroup = getWorkerGroup(1);
+        Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+        Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
+                org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null);
+        Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
+        Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""))
+                .thenReturn(1);
+
+        Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
+        Assertions.assertEquals(Status.SUCCESS.getCode(),
+                ((Status) successResult.get(Constants.STATUS)).getCode());
     }
 
     @Test
     public void testQueryAllGroupWithDefault() {
-        Map<String, Object> result = workerGroupService.queryAllGroup(loginUSer);
+        Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
         List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
         Assertions.assertEquals(1, workerGroups.size());
         Assertions.assertEquals("default", workerGroups.toArray()[0]);
     }
 
+    @Test
+    public void giveNull_whenGetTaskWorkerGroup_expectNull() {
+        String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null);
+        Assertions.assertNull(nullWorkerGroup);
+    }
+
+    @Test
+    public void giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setWorkerGroup("cluster1");
+
+        String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
+        Assertions.assertEquals("cluster1", workerGroup);
+    }
+
+    @Test
+    public void giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setProcessInstanceId(1);
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setWorkerGroup("cluster1");
+        Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance);
+
+        String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
+        Assertions.assertEquals("cluster1", workerGroup);
+    }
+
+    @Test
+    public void giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setProcessInstanceId(1);
+        Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null);
+
+        String defaultWorkerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
+        Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP, defaultWorkerGroup);
+    }
+
     /**
      * get Group
      */
     private WorkerGroup getWorkerGroup(int id) {
         WorkerGroup workerGroup = new WorkerGroup();
-        workerGroup.setName(groupName);
+        workerGroup.setName(GROUP_NAME);
         workerGroup.setId(id);
         return workerGroup;
     }
 
-    private WorkerGroup getWorkerGroup() {
-        return getWorkerGroup(1);
-    }
-
-    private List<WorkerGroup> getList() {
-        List<WorkerGroup> list = new ArrayList<>();
-        list.add(getWorkerGroup());
-        return list;
-    }
-
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 76a6e6ea52..74989430e4 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -119,8 +119,6 @@ public interface ProcessService {
 
     List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
 
-    Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);
-
     List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode);
 
     List<ProcessInstance> queryNeedFailoverProcessInstances(String host);
@@ -150,8 +148,6 @@ public interface ProcessService {
 
     ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);
 
-    String getTaskWorkerGroup(TaskInstance taskInstance);
-
     List<Project> getProjectListHavePerm(int userId);
 
     <T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 5c6a149657..70d4c52504 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1570,20 +1570,6 @@ public class ProcessServiceImpl implements ProcessService {
         return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
     }
 
-    /**
-     * query Schedule by processDefinitionCode
-     *
-     * @param processDefinitionCodeList processDefinitionCodeList
-     * @see Schedule
-     */
-    @Override
-    public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
-        List<Schedule> processDefinitionScheduleList =
-                scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
-        return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
-                Schedule::getWorkerGroup));
-    }
-
     /**
      * query dependent process definition by process definition code
      *
@@ -1797,29 +1783,6 @@ public class ProcessServiceImpl implements ProcessService {
         return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId);
     }
 
-    /**
-     * get task worker group
-     *
-     * @param taskInstance taskInstance
-     * @return workerGroupId
-     */
-    @Override
-    public String getTaskWorkerGroup(TaskInstance taskInstance) {
-        String workerGroup = taskInstance.getWorkerGroup();
-
-        if (!Strings.isNullOrEmpty(workerGroup)) {
-            return workerGroup;
-        }
-        int processInstanceId = taskInstance.getProcessInstanceId();
-        ProcessInstance processInstance = findProcessInstanceById(processInstanceId);
-
-        if (processInstance != null) {
-            return processInstance.getWorkerGroup();
-        }
-        logger.info("task : {} will use default worker group", taskInstance.getId());
-        return Constants.DEFAULT_WORKER_GROUP;
-    }
-
     /**
      * get have perm project list
      *