You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/07 06:51:06 UTC
[inlong] branch master updated: [INLONG-4874][Manager] Cascade modification of associated data when modifying or deleting cluster tag (#4886)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2bc83569f [INLONG-4874][Manager] Cascade modification of associated data when modifying or deleting cluster tag (#4886)
2bc83569f is described below
commit 2bc83569ffdac001a848fdbfc4aff4de99e46f48
Author: healchow <he...@gmail.com>
AuthorDate: Thu Jul 7 14:51:00 2022 +0800
[INLONG-4874][Manager] Cascade modification of associated data when modifying or deleting cluster tag (#4886)
---
.../dao/mapper/InlongClusterEntityMapper.java | 2 +-
.../dao/mapper/InlongGroupEntityMapper.java | 2 +
.../mappers/InlongClusterEntityMapper.xml | 7 +-
.../resources/mappers/InlongGroupEntityMapper.xml | 8 ++
.../service/cluster/InlongClusterServiceImpl.java | 98 ++++++++++++++++++----
5 files changed, 97 insertions(+), 20 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index 7a86fcdee..3ae403795 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -32,7 +32,7 @@ public interface InlongClusterEntityMapper {
InlongClusterEntity selectById(Integer id);
/**
- * Select clusters by tags, name and type, the tag and name can be null.
+ * Select clusters by tags, name and type.
*/
List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @Param("name") String name,
@Param("type") String type);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
index 65c4ce5f1..76b2e0b49 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
@@ -43,6 +43,8 @@ public interface InlongGroupEntityMapper {
List<InlongGroupBriefInfo> selectBriefList(InlongGroupPageRequest request);
+ List<InlongGroupEntity> selectByClusterTag(@Param(value = "inlongClusterTag") String inlongClusterTag);
+
int updateByPrimaryKey(InlongGroupEntity record);
int updateByIdentifierSelective(InlongGroupEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index 5f6b5f5e9..83be1b4bd 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -65,14 +65,15 @@
from inlong_cluster
where id = #{id,jdbcType=INTEGER}
</select>
- <select id="selectByKey" parameterType="org.apache.inlong.manager.common.pojo.cluster.ClusterRequest"
- resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
+ <select id="selectByKey" resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
select
<include refid="Base_Column_List"/>
from inlong_cluster
<where>
is_deleted = 0
- and `type` = #{type, jdbcType=VARCHAR}
+ <if test="type != null and type != ''">
+ and `type` = #{type, jdbcType=VARCHAR}
+ </if>
<if test="clusterTag != null and clusterTag != ''">
and find_in_set(#{clusterTag, jdbcType=VARCHAR}, cluster_tags)
</if>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index 591c78b7f..ef05a4c5a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -185,6 +185,14 @@
order by modify_time desc
limit 100
</select>
+ <select id="selectByClusterTag" resultType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_group
+ where is_deleted = 0
+ and inlong_cluster_tag = #{inlongClusterTag, jdbcType=VARCHAR}
+ limit 10
+ </select>
<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
update inlong_group
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index ac198a4b6..ca1690536 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -55,6 +55,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterTagEntityMapper;
@@ -158,18 +159,43 @@ public class InlongClusterServiceImpl implements InlongClusterService {
public Boolean updateTag(ClusterTagRequest request, String operator) {
LOGGER.debug("begin to update cluster tag={}", request);
Preconditions.checkNotNull(request, "inlong cluster request cannot be empty");
- Preconditions.checkNotNull(request.getClusterTag(), "inlong cluster tag cannot be empty");
+ String newClusterTag = request.getClusterTag();
+ Preconditions.checkNotNull(newClusterTag, "inlong cluster tag cannot be empty");
Integer id = request.getId();
Preconditions.checkNotNull(id, "cluster tag id cannot be empty");
- // check cluster tag if exist
- InlongClusterTagEntity exist = clusterTagMapper.selectByTag(request.getClusterTag());
- if (exist != null && !Objects.equals(id, exist.getId())) {
- String errMsg = String.format("inlong cluster tag [%s] already exist", request.getClusterTag());
+ InlongClusterTagEntity exist = clusterTagMapper.selectById(id);
+ if (exist == null) {
+ LOGGER.warn("inlong cluster tag was not exist for id={}", id);
+ return true;
+ }
+ InlongClusterTagEntity tagExist = clusterTagMapper.selectByTag(newClusterTag);
+ if (tagExist != null) {
+ String errMsg = String.format("inlong cluster tag [%s] already exist", newClusterTag);
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
+ // check if there are some InlongGroups that uses this tag
+ String oldClusterTag = exist.getClusterTag();
+ this.assertNoInlongGroupExists(oldClusterTag);
+
+ // update the associated cluster tag in inlong_cluster
+ Date now = new Date();
+ List<InlongClusterEntity> clusterEntities = clusterMapper.selectByKey(newClusterTag, null, null);
+ if (CollectionUtils.isNotEmpty(clusterEntities)) {
+ clusterEntities.forEach(entity -> {
+ HashSet<String> tagSet = Sets.newHashSet(entity.getClusterTags().split(","));
+ tagSet.remove(oldClusterTag);
+ tagSet.add(newClusterTag);
+ String updateTags = Joiner.on(",").join(tagSet);
+ entity.setClusterTags(updateTags);
+ entity.setModifier(operator);
+ entity.setModifyTime(now);
+ clusterMapper.updateByIdSelective(entity);
+ });
+ }
+
InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
if (entity == null) {
LOGGER.error("cluster tag not found by id={}", id);
@@ -186,14 +212,28 @@ public class InlongClusterServiceImpl implements InlongClusterService {
@Override
public Boolean deleteTag(Integer id, String operator) {
Preconditions.checkNotNull(id, "cluster tag id cannot be empty");
- InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
- if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
+ InlongClusterTagEntity exist = clusterTagMapper.selectById(id);
+ if (exist == null || exist.getIsDeleted() > InlongConstants.UN_DELETED) {
LOGGER.error("inlong cluster tag not found by id={}", id);
return false;
}
- entity.setIsDeleted(entity.getId());
- entity.setModifier(operator);
- clusterTagMapper.updateById(entity);
+
+ // check if there are some InlongGroups that uses this tag
+ String clusterTag = exist.getClusterTag();
+ this.assertNoInlongGroupExists(clusterTag);
+
+ // update the associated cluster tag in inlong_cluster
+ Date now = new Date();
+ List<InlongClusterEntity> clusterEntities = clusterMapper.selectByKey(clusterTag, null, null);
+ if (CollectionUtils.isNotEmpty(clusterEntities)) {
+ clusterEntities.forEach(entity -> {
+ this.removeClusterTag(entity, clusterTag, operator, now);
+ });
+ }
+
+ exist.setIsDeleted(exist.getId());
+ exist.setModifier(operator);
+ clusterTagMapper.updateById(exist);
LOGGER.info("success to delete cluster tag by id={}", id);
return true;
}
@@ -304,6 +344,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
String clusterTag = request.getClusterTag();
Preconditions.checkNotNull(clusterTag, "cluster tag cannot be empty");
+ Date now = new Date();
if (CollectionUtils.isNotEmpty(request.getBindClusters())) {
request.getBindClusters().forEach(id -> {
InlongClusterEntity entity = clusterMapper.selectById(id);
@@ -314,6 +355,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
updateEntity.setId(id);
updateEntity.setClusterTags(updateTags);
updateEntity.setModifier(operator);
+ entity.setModifyTime(now);
clusterMapper.updateByIdSelective(updateEntity);
});
}
@@ -321,12 +363,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
if (CollectionUtils.isNotEmpty(request.getUnbindClusters())) {
request.getUnbindClusters().forEach(id -> {
InlongClusterEntity entity = clusterMapper.selectById(id);
- HashSet<String> tagSet = Sets.newHashSet(entity.getClusterTags().split(","));
- tagSet.remove(clusterTag);
- String updateTags = Joiner.on(",").join(tagSet);
- entity.setClusterTags(updateTags);
- entity.setModifier(operator);
- clusterMapper.updateByIdSelective(entity);
+ this.removeClusterTag(entity, clusterTag, operator, now);
});
}
LOGGER.info("success to bind or unbind cluster tag {} by {}", request, operator);
@@ -629,4 +666,33 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return configJson;
}
+ /**
+ * Remove cluster tag from the given cluster entity.
+ */
+ private void removeClusterTag(InlongClusterEntity entity, String clusterTag, String operator, Date modifyTime) {
+ HashSet<String> tagSet = Sets.newHashSet(entity.getClusterTags().split(","));
+ tagSet.remove(clusterTag);
+ String updateTags = Joiner.on(",").join(tagSet);
+ entity.setClusterTags(updateTags);
+ entity.setModifier(operator);
+ entity.setModifyTime(modifyTime);
+ clusterMapper.updateByIdSelective(entity);
+ }
+
+ /**
+ * Make sure there is no InlongGroup using this tag.
+ */
+ private void assertNoInlongGroupExists(String clusterTag) {
+ List<InlongGroupEntity> groupEntities = groupMapper.selectByClusterTag(clusterTag);
+ if (CollectionUtils.isEmpty(groupEntities)) {
+ return;
+ }
+ List<String> groupIds = groupEntities.stream()
+ .map(InlongGroupEntity::getInlongGroupId)
+ .collect(Collectors.toList());
+ String errMsg = String.format("inlong cluster tag [%s] was used by inlong group %s", clusterTag, groupIds);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg + ", please delete them first");
+ }
+
}