You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2023/10/11 08:06:18 UTC

[dolphinscheduler] 16/40: [Improvement][API] Support to check if the worker group has been used by any tasks or schedulers when users delete or rename it. (#14893)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.2.0-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit b77d5c4b885793b24fec807038d590627c788adc
Author: calvin <ji...@163.com>
AuthorDate: Wed Sep 13 10:34:59 2023 +0800

    [Improvement][API] Support to check if the worker group has been used by any tasks or schedulers when users delete or rename it.  (#14893)
    
    (cherry picked from commit a070aa93a72c6252b49514e66bb3b05291deacb7)
---
 .../apache/dolphinscheduler/api/enums/Status.java  | 15 ++++-
 .../api/service/impl/WorkerGroupServiceImpl.java   | 75 ++++++++++++++++++++--
 .../api/service/WorkerGroupServiceTest.java        | 17 ++++-
 3 files changed, 98 insertions(+), 9 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 4210b918b3..49058ad2e6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -568,7 +568,20 @@ public enum Status {
     SCHEDULE_TIME_NUMBER_EXCEED(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"),
     DESCRIPTION_TOO_LONG_ERROR(1400004, "description is too long error", "描述过长"),
     DELETE_WORKER_GROUP_BY_ID_FAIL_ENV(1400005,
-            "delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}");
+            "delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}"),
+
+    WORKER_GROUP_DEPENDENT_TASK_EXISTS(1401000,
+            "You can not modify or remove this worker group, cause it has [{0}] dependent tasks like :{1}",
+            "不能修改或删除该Worker组,有 [{0}] 个任务正在使用:{1}"),
+
+    WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS(1401001,
+            "You can not modify or remove this worker group, cause it has [{0}] dependent workflow timings like :{1}",
+            "不能修改或删除该Worker组,有 [{0}] 个工作流定时正在使用:{1}"),
+
+    WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS(1401002,
+            "You can not modify or remove this worker group, cause it has [{0}] dependent environments.",
+            "不能修改或删除该Worker组,有 [{0}] 个环境配置正在使用"),
+            ;
     private final int code;
     private final String enMsg;
     private final String zhMsg;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 34aa50bb18..e097e2ff58 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -27,15 +27,19 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -61,6 +65,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.facebook.presto.jdbc.internal.guava.base.Strings;
 
 /**
@@ -88,6 +93,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
     @Autowired
     private ScheduleMapper scheduleMapper;
 
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
     /**
      * create or update a worker group
      *
@@ -115,11 +126,17 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         WorkerGroup workerGroup = null;
         if (id != 0) {
             workerGroup = workerGroupMapper.selectById(id);
+            if (Objects.nonNull(workerGroup) && !workerGroup.getName().equals(name)) {
+                if (checkWorkerGroupDependencies(workerGroup, result)) {
+                    return result;
+                }
+            }
         }
         if (workerGroup == null) {
             workerGroup = new WorkerGroup();
             workerGroup.setCreateTime(now);
         }
+
         workerGroup.setName(name);
         workerGroup.setAddrList(addrList);
         workerGroup.setUpdateTime(now);
@@ -136,6 +153,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
             putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
             return result;
         }
+
         handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
         log.info("Worker group save complete, workerGroupName:{}.", workerGroup.getName());
         putMsg(result, Status.SUCCESS);
@@ -177,6 +195,53 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         return false;
     }
 
+    /**
+     * check if the worker group has any dependent tasks,schedulers or environments.
+     *
+     * @param workerGroup worker group
+     * @return boolean
+     */
+    private boolean checkWorkerGroupDependencies(WorkerGroup workerGroup, Map<String, Object> result) {
+        // check if the worker group has any dependent tasks
+        List<TaskDefinition> taskDefinitions = taskDefinitionMapper.selectList(
+                new QueryWrapper<TaskDefinition>().lambda().eq(TaskDefinition::getWorkerGroup, workerGroup.getName()));
+
+        if (CollectionUtils.isNotEmpty(taskDefinitions)) {
+            List<String> taskNames = taskDefinitions.stream().limit(3).map(taskDefinition -> taskDefinition.getName())
+                    .collect(Collectors.toList());
+
+            putMsg(result, Status.WORKER_GROUP_DEPENDENT_TASK_EXISTS, taskDefinitions.size(),
+                    JSONUtils.toJsonString(taskNames));
+            return true;
+        }
+
+        // check if the worker group has any dependent schedulers
+        List<Schedule> schedules = scheduleMapper
+                .selectList(new QueryWrapper<Schedule>().lambda().eq(Schedule::getWorkerGroup, workerGroup.getName()));
+
+        if (CollectionUtils.isNotEmpty(schedules)) {
+            List<String> processNames = schedules.stream().limit(3)
+                    .map(schedule -> processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode()).getName())
+                    .collect(Collectors.toList());
+
+            putMsg(result, Status.WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS, schedules.size(),
+                    JSONUtils.toJsonString(processNames));
+            return true;
+        }
+
+        // check if the worker group has any dependent environments
+        List<EnvironmentWorkerGroupRelation> environmentWorkerGroupRelations =
+                environmentWorkerGroupRelationMapper.selectList(new QueryWrapper<EnvironmentWorkerGroupRelation>()
+                        .lambda().eq(EnvironmentWorkerGroupRelation::getWorkerGroup, workerGroup.getName()));
+
+        if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelations)) {
+            putMsg(result, Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS, environmentWorkerGroupRelations.size());
+            return true;
+        }
+
+        return false;
+    }
+
     /**
      * check worker group addr list
      *
@@ -341,15 +406,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
             putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
             return result;
         }
-        List<EnvironmentWorkerGroupRelation> environmentWorkerGroupRelationList =
-                environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName());
-        if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelationList)) {
-            putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL_ENV, environmentWorkerGroupRelationList.size(),
-                    workerGroup.getName());
+
+        if (checkWorkerGroupDependencies(workerGroup, result)) {
             return result;
         }
+
         workerGroupMapper.deleteById(id);
-        processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), "");
+
         log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
         putMsg(result, Status.SUCCESS);
         return result;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index bf2b9363b2..08a541c5bb 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -35,6 +35,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -90,6 +92,12 @@ public class WorkerGroupServiceTest {
     @Mock
     private EnvironmentWorkerGroupRelationMapper environmentWorkerGroupRelationMapper;
 
+    @Mock
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Mock
+    private ScheduleMapper scheduleMapper;
+
     private final String GROUP_NAME = "testWorkerGroup";
 
     private User getLoginUser() {
@@ -257,11 +265,16 @@ public class WorkerGroupServiceTest {
         Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
         Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
                 org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null);
+
         Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
-        Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""))
-                .thenReturn(1);
+
         Mockito.when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
                 .thenReturn(null);
+
+        Mockito.when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);
+
+        Mockito.when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);
+
         Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
         Assertions.assertEquals(Status.SUCCESS.getCode(),
                 ((Status) successResult.get(Constants.STATUS)).getCode());