You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2023/10/11 08:06:18 UTC
[dolphinscheduler] 16/40: [Improvement][API] Support to check if the worker group has been used by any tasks or schedulers when users delete or rename it. (#14893)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 3.2.0-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit b77d5c4b885793b24fec807038d590627c788adc
Author: calvin <ji...@163.com>
AuthorDate: Wed Sep 13 10:34:59 2023 +0800
[Improvement][API] Support to check if the worker group has been used by any tasks or schedulers when users delete or rename it. (#14893)
(cherry picked from commit a070aa93a72c6252b49514e66bb3b05291deacb7)
---
.../apache/dolphinscheduler/api/enums/Status.java | 15 ++++-
.../api/service/impl/WorkerGroupServiceImpl.java | 75 ++++++++++++++++++++--
.../api/service/WorkerGroupServiceTest.java | 17 ++++-
3 files changed, 98 insertions(+), 9 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 4210b918b3..49058ad2e6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -568,7 +568,20 @@ public enum Status {
SCHEDULE_TIME_NUMBER_EXCEED(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"),
DESCRIPTION_TOO_LONG_ERROR(1400004, "description is too long error", "描述过长"),
DELETE_WORKER_GROUP_BY_ID_FAIL_ENV(1400005,
- "delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}");
+ "delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}"),
+
+ WORKER_GROUP_DEPENDENT_TASK_EXISTS(1401000,
+ "You can not modify or remove this worker group, cause it has [{0}] dependent tasks like :{1}",
+ "不能修改或删除该Worker组,有 [{0}] 个任务正在使用:{1}"),
+
+ WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS(1401001,
+ "You can not modify or remove this worker group, cause it has [{0}] dependent workflow timings like :{1}",
+ "不能修改或删除该Worker组,有 [{0}] 个工作流定时正在使用:{1}"),
+
+ WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS(1401002,
+ "You can not modify or remove this worker group, cause it has [{0}] dependent environments.",
+ "不能修改或删除该Worker组,有 [{0}] 个环境配置正在使用"),
+ ;
private final int code;
private final String enMsg;
private final String zhMsg;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 34aa50bb18..e097e2ff58 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -27,15 +27,19 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -61,6 +65,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.facebook.presto.jdbc.internal.guava.base.Strings;
/**
@@ -88,6 +93,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private ScheduleMapper scheduleMapper;
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Autowired
+ private ProcessDefinitionMapper processDefinitionMapper;
+
/**
* create or update a worker group
*
@@ -115,11 +126,17 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
WorkerGroup workerGroup = null;
if (id != 0) {
workerGroup = workerGroupMapper.selectById(id);
+ if (Objects.nonNull(workerGroup) && !workerGroup.getName().equals(name)) {
+ if (checkWorkerGroupDependencies(workerGroup, result)) {
+ return result;
+ }
+ }
}
if (workerGroup == null) {
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
+
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
@@ -136,6 +153,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result;
}
+
handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
log.info("Worker group save complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
@@ -177,6 +195,53 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
return false;
}
+ /**
+ * check if the worker group has any dependent tasks,schedulers or environments.
+ *
+ * @param workerGroup worker group
+ * @return boolean
+ */
+ private boolean checkWorkerGroupDependencies(WorkerGroup workerGroup, Map<String, Object> result) {
+ // check if the worker group has any dependent tasks
+ List<TaskDefinition> taskDefinitions = taskDefinitionMapper.selectList(
+ new QueryWrapper<TaskDefinition>().lambda().eq(TaskDefinition::getWorkerGroup, workerGroup.getName()));
+
+ if (CollectionUtils.isNotEmpty(taskDefinitions)) {
+ List<String> taskNames = taskDefinitions.stream().limit(3).map(taskDefinition -> taskDefinition.getName())
+ .collect(Collectors.toList());
+
+ putMsg(result, Status.WORKER_GROUP_DEPENDENT_TASK_EXISTS, taskDefinitions.size(),
+ JSONUtils.toJsonString(taskNames));
+ return true;
+ }
+
+ // check if the worker group has any dependent schedulers
+ List<Schedule> schedules = scheduleMapper
+ .selectList(new QueryWrapper<Schedule>().lambda().eq(Schedule::getWorkerGroup, workerGroup.getName()));
+
+ if (CollectionUtils.isNotEmpty(schedules)) {
+ List<String> processNames = schedules.stream().limit(3)
+ .map(schedule -> processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode()).getName())
+ .collect(Collectors.toList());
+
+ putMsg(result, Status.WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS, schedules.size(),
+ JSONUtils.toJsonString(processNames));
+ return true;
+ }
+
+ // check if the worker group has any dependent environments
+ List<EnvironmentWorkerGroupRelation> environmentWorkerGroupRelations =
+ environmentWorkerGroupRelationMapper.selectList(new QueryWrapper<EnvironmentWorkerGroupRelation>()
+ .lambda().eq(EnvironmentWorkerGroupRelation::getWorkerGroup, workerGroup.getName()));
+
+ if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelations)) {
+ putMsg(result, Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS, environmentWorkerGroupRelations.size());
+ return true;
+ }
+
+ return false;
+ }
+
/**
* check worker group addr list
*
@@ -341,15 +406,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
return result;
}
- List<EnvironmentWorkerGroupRelation> environmentWorkerGroupRelationList =
- environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName());
- if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelationList)) {
- putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL_ENV, environmentWorkerGroupRelationList.size(),
- workerGroup.getName());
+
+ if (checkWorkerGroupDependencies(workerGroup, result)) {
return result;
}
+
workerGroupMapper.deleteById(id);
- processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), "");
+
log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
return result;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index bf2b9363b2..08a541c5bb 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -35,6 +35,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -90,6 +92,12 @@ public class WorkerGroupServiceTest {
@Mock
private EnvironmentWorkerGroupRelationMapper environmentWorkerGroupRelationMapper;
+ @Mock
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Mock
+ private ScheduleMapper scheduleMapper;
+
private final String GROUP_NAME = "testWorkerGroup";
private User getLoginUser() {
@@ -257,11 +265,16 @@ public class WorkerGroupServiceTest {
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null);
+
Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
- Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""))
- .thenReturn(1);
+
Mockito.when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
.thenReturn(null);
+
+ Mockito.when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);
+
+ Mockito.when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);
+
Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(),
((Status) successResult.get(Constants.STATUS)).getCode());