You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/14 03:31:33 UTC
[inlong] branch master updated: [INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 530bc4b23 [INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514)
530bc4b23 is described below
commit 530bc4b23e06a219a31c9ecea3cefad921c01db4
Author: healchow <he...@gmail.com>
AuthorDate: Mon Nov 14 11:31:28 2022 +0800
[INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514)
---
.../inlong/manager/common/enums/ErrorCodeEnum.java | 16 +-
.../service/group/InlongGroupProcessService.java | 32 +--
.../manager/service/group/InlongGroupService.java | 75 +++---
.../service/group/InlongGroupServiceImpl.java | 285 +++++++++++----------
.../group/UpdateGroupCompleteListener.java | 10 +-
.../listener/group/UpdateGroupFailedListener.java | 1 +
.../service/stream/InlongStreamService.java | 23 +-
.../service/stream/InlongStreamServiceImpl.java | 22 +-
.../web/controller/InlongGroupController.java | 76 +++---
9 files changed, 282 insertions(+), 258 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 87a6e706c..f62e9e920 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -39,17 +39,17 @@ public enum ErrorCodeEnum {
GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
GROUP_DUPLICATE(1002, "Inlong group already exists"),
GROUP_INFO_INCORRECT(1003, "Group info was incorrect"),
- GROUP_SAVE_FAILED(1003, "Failed to save/update inlong group"),
- GROUP_PERMISSION_DENIED(1004, "No permission to access this inlong group"),
- GROUP_HAS_STREAM(1005, "There are some valid inlong stream for this inlong group"),
+ GROUP_SAVE_FAILED(1004, "Failed to save/update inlong group"),
+ GROUP_PERMISSION_DENIED(1005, "No permission to access this inlong group"),
GROUP_UPDATE_NOT_ALLOWED(1006, "The current inlong group status does not support modification"),
GROUP_DELETE_NOT_ALLOWED(1007, "The current inlong group status does not support deletion"),
- GROUP_ID_UPDATE_NOT_ALLOWED(1008, "The current inlong group status does not support modifying the group id"),
- GROUP_MIDDLEWARE_UPDATE_NOT_ALLOWED(1011,
- "The current inlong group status does not support modifying the MQ type"),
+ GROUP_DELETE_HAS_STREAM(1008, "The inlong group contains inlong streams and is not allowed to be deleted"),
+
+ GROUP_ID_UPDATE_NOT_ALLOWED(1010, "The current status does not support modifying the inlong group id"),
+ GROUP_MIDDLEWARE_UPDATE_NOT_ALLOWED(1011, "The current status does not support modifying the MQ type"),
GROUP_NAME_UPDATE_NOT_ALLOWED(1012, "The current inlong group status does not support modifying the name"),
GROUP_INFO_INCONSISTENT(1013, "The inlong group info is inconsistent, please contact the administrator"),
- GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support lightweight, standard"),
+ GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support lightweight or standard"),
OPT_NOT_ALLOWED_BY_STATUS(1021, "InlongGroup status %s was not allowed to add/update/delete related info"),
@@ -74,7 +74,7 @@ public enum ErrorCodeEnum {
STREAM_EXT_SAVE_FAILED(1207, "Failed to save/update inlong stream extension information"),
STREAM_FIELD_SAVE_FAILED(1208, "Failed to save/update inlong stream field"),
STREAM_DELETE_HAS_SOURCE(1209, "The inlong stream contains source info and is not allowed to be deleted"),
- STREAM_DELETE_HAS_SINK(1210, "The inlong stream contains data sink info and is not allowed to be deleted"),
+ STREAM_DELETE_HAS_SINK(1210, "The inlong stream contains sink info and is not allowed to be deleted"),
SOURCE_TYPE_IS_NULL(1300, "Source type is null"),
SOURCE_TYPE_NOT_SUPPORT(1301, "Source type '%s' not support"),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index de044f0aa..0109cdec1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -185,36 +185,34 @@ public class InlongGroupProcessService {
* @return inlong group id
*/
public String deleteProcessAsync(String groupId, String operator) {
- LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator);
+ LOGGER.info("begin to delete group asynchronously for groupId={} by user={}", groupId, operator);
EXECUTOR_SERVICE.execute(() -> {
try {
invokeDeleteProcess(groupId, operator);
- } catch (Exception ex) {
- LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
- throw ex;
+ } catch (Exception e) {
+ LOGGER.error(String.format("failed to async delete group for groupId=%s by %s", groupId, operator), e);
+ throw e;
}
- groupService.delete(groupId, operator);
});
- LOGGER.info("success to delete process asynchronously for groupId={} by operator={}", groupId, operator);
+ LOGGER.info("success to delete group asynchronously for groupId={} by user={}", groupId, operator);
return groupId;
}
/**
- * Delete InlongGroup logically and delete related resource in an asynchronous way.
+ * Delete InlongGroup logically and delete related resource in a synchronous way.
*/
- public boolean deleteProcess(String groupId, String operator) {
- LOGGER.info("begin to delete process for groupId={} by operator={}", groupId, operator);
+ public Boolean deleteProcess(String groupId, String operator) {
+ LOGGER.info("begin to delete group for groupId={} by user={}", groupId, operator);
try {
invokeDeleteProcess(groupId, operator);
- } catch (Exception ex) {
- LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
- throw ex;
+ } catch (Exception e) {
+ LOGGER.error(String.format("failed to delete group for groupId=%s by user=%s", groupId, operator), e);
+ throw e;
}
- boolean result = groupService.delete(groupId, operator);
- LOGGER.info("success to delete process for groupId={} by operator={}", groupId, operator);
- return result;
+ LOGGER.info("success to delete group for groupId={} by user={}", groupId, operator);
+ return true;
}
/**
@@ -287,7 +285,9 @@ public class InlongGroupProcessService {
}
private void invokeDeleteProcess(String groupId, String operator) {
- InlongGroupInfo groupInfo = groupService.get(groupId);
+ // check can be deleted
+ InlongGroupInfo groupInfo = groupService.doDeleteCheck(groupId, operator);
+ // start to delete group process
GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.DELETE);
workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 3c07faa28..58acbfb27 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -45,6 +45,14 @@ public interface InlongGroupService {
*/
String save(InlongGroupRequest groupInfo, String operator);
+ /**
+ * Query whether the specified group id exists
+ *
+ * @param groupId the group id to be queried
+ * @return does it exist
+ */
+ Boolean exist(String groupId);
+
/**
* Get inlong group info based on inlong group id
*
@@ -53,6 +61,31 @@ public interface InlongGroupService {
*/
InlongGroupInfo get(String groupId);
+ /**
+ * Query the group information of each status of the current user
+ *
+ * @param operator name of operator
+ * @return inlong group status statistics
+ */
+ InlongGroupCountResponse countGroupByUser(String operator);
+
+ /**
+ * According to the group id, query the topic to which it belongs
+ *
+ * @param groupId Inlong group id
+ * @return Topic information
+ * @apiNote TubeMQ corresponds to the group, only 1 topic
+ */
+ InlongGroupTopicInfo getTopic(String groupId);
+
+ /**
+ * According to the group id, query the backup topic to which it belongs
+ *
+ * @param groupId inlong group id
+ * @return backup topic info
+ */
+ InlongGroupTopicInfo getBackupTopic(String groupId);
+
/**
* Paging query inlong group brief info list
*
@@ -79,48 +112,26 @@ public interface InlongGroupService {
* @param operator name of operator
* @return whether succeed
*/
- boolean updateStatus(String groupId, Integer status, String operator);
+ Boolean updateStatus(String groupId, Integer status, String operator);
/**
- * Delete the group information of the specified group id
+ * Check whether deletion is supported for the specified group.
*
- * @param groupId The group id that needs to be deleted
+ * @param groupId inlong group id
* @param operator name of operator
- * @return whether succeed
- */
- boolean delete(String groupId, String operator);
-
- /**
- * Query whether the specified group id exists
- *
- * @param groupId the group id to be queried
- * @return does it exist
+ * @return inlong group info
*/
- Boolean exist(String groupId);
+ InlongGroupInfo doDeleteCheck(String groupId, String operator);
/**
- * Query the group information of each status of the current user
+ * Delete the group information of the specified group id
*
+ * @param groupId The group id that needs to be deleted
* @param operator name of operator
- * @return inlong group status statistics
- */
- InlongGroupCountResponse countGroupByUser(String operator);
-
- /**
- * According to the group id, query the topic to which it belongs
- *
- * @param groupId inlong group id
- * @return topic info
- */
- InlongGroupTopicInfo getTopic(String groupId);
-
- /**
- * According to the group id, query the backup topic to which it belongs
- *
- * @param groupId inlong group id
- * @return backup topic info
+ * @return whether succeed
+ * @apiNote Before invoking this delete method, you must
*/
- InlongGroupTopicInfo getBackupTopic(String groupId);
+ Boolean delete(String groupId, String operator);
/**
* Save the group modified when the approval is passed
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 6a5839065..9c143fd67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -118,7 +117,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
* @param request request of updated
* @param operator current operator
*/
- private static void checkGroupCanUpdate(InlongGroupEntity entity, InlongGroupRequest request, String operator) {
+ private static void doUpdateCheck(InlongGroupEntity entity, InlongGroupRequest request, String operator) {
if (entity == null || request == null) {
return;
}
@@ -146,8 +145,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
}
}
- @Transactional(rollbackFor = Throwable.class)
@Override
+ @Transactional(rollbackFor = Throwable.class)
public String save(InlongGroupRequest request, String operator) {
LOGGER.debug("begin to save inlong group={} by user={}", request, operator);
Preconditions.checkNotNull(request, "inlong group request cannot be empty");
@@ -158,6 +157,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
LOGGER.error("groupId {} has already exists", groupId);
throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
}
+
request.setEnableZookeeper(enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
InlongGroupOperator instance = groupOperatorFactory.getInstance(request.getMqType());
groupId = instance.saveOpt(request, operator);
@@ -169,6 +169,14 @@ public class InlongGroupServiceImpl implements InlongGroupService {
return groupId;
}
+ @Override
+ public Boolean exist(String groupId) {
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+ LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null);
+ return entity != null;
+ }
+
@Override
public InlongGroupInfo get(String groupId) {
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
@@ -192,6 +200,75 @@ public class InlongGroupServiceImpl implements InlongGroupService {
return groupInfo;
}
+ @Override
+ public InlongGroupCountResponse countGroupByUser(String operator) {
+ InlongGroupCountResponse countVO = new InlongGroupCountResponse();
+ List<Map<String, Object>> statusCount = groupMapper.countGroupByUser(operator);
+ for (Map<String, Object> map : statusCount) {
+ int status = (Integer) map.get("status");
+ long count = (Long) map.get("count");
+ countVO.setTotalCount(countVO.getTotalCount() + count);
+ if (status == GroupStatus.CONFIG_ING.getCode()) {
+ countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
+ } else if (status == GroupStatus.TO_BE_APPROVAL.getCode()) {
+ countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
+ } else if (status == GroupStatus.APPROVE_REJECTED.getCode()) {
+ countVO.setRejectCount(countVO.getRejectCount() + count);
+ }
+ }
+
+ LOGGER.debug("success to count inlong group for operator={}", operator);
+ return countVO;
+ }
+
+ @Override
+ public InlongGroupTopicInfo getTopic(String groupId) {
+ // the group info will not null in get() method
+ InlongGroupInfo groupInfo = this.get(groupId);
+ InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
+ InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo);
+
+ // set the base params
+ topicInfo.setInlongGroupId(groupId);
+ String clusterTag = groupInfo.getInlongClusterTag();
+ topicInfo.setInlongClusterTag(clusterTag);
+
+ // assert: each MQ type has a corresponding type of cluster
+ List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, groupInfo.getMqType());
+ topicInfo.setClusterInfos(clusterInfos);
+
+ LOGGER.debug("success to get topic for groupId={}, result={}", groupId, topicInfo);
+ return topicInfo;
+ }
+
+ @Override
+ public InlongGroupTopicInfo getBackupTopic(String groupId) {
+ // backup topic info saved in the ext table
+ InlongGroupExtEntity extEntity = groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG);
+ if (extEntity == null || StringUtils.isBlank(extEntity.getKeyValue())) {
+ LOGGER.warn("not found any backup topic for groupId={}", groupId);
+ return null;
+ }
+
+ // the group info will not null in get() method
+ InlongGroupInfo groupInfo = this.get(groupId);
+ InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
+ InlongGroupTopicInfo backupTopicInfo = groupOperator.getBackupTopic(groupInfo);
+
+ // set the base params
+ backupTopicInfo.setInlongGroupId(groupId);
+ String backupClusterTag = extEntity.getKeyValue();
+ backupTopicInfo.setInlongClusterTag(backupClusterTag);
+
+ // set backup cluster info
+ // assert: each MQ type has a corresponding type of cluster
+ List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType());
+ backupTopicInfo.setClusterInfos(clusterInfos);
+
+ LOGGER.debug("success to get backup topic for groupId={}, result={}", groupId, backupTopicInfo);
+ return backupTopicInfo;
+ }
+
@Override
public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request) {
if (request.getPageSize() > MAX_PAGE_SIZE) {
@@ -231,7 +308,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
}
@Override
- @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
+ @Transactional(rollbackFor = Throwable.class,
+ isolation = Isolation.REPEATABLE_READ,
propagation = Propagation.REQUIRES_NEW)
public String update(InlongGroupRequest request, String operator) {
LOGGER.debug("begin to update inlong group={} by user={}", request, operator);
@@ -248,7 +326,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
// check whether the current status can be modified
- checkGroupCanUpdate(entity, request, operator);
+ doUpdateCheck(entity, request, operator);
request.setEnableZookeeper(enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
InlongGroupOperator instance = groupOperatorFactory.getInstance(request.getMqType());
@@ -264,7 +342,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
propagation = Propagation.REQUIRES_NEW)
- public boolean updateStatus(String groupId, Integer status, String operator) {
+ public Boolean updateStatus(String groupId, Integer status, String operator) {
LOGGER.info("begin to update group status to [{}] for groupId={} by user={}", status, groupId, operator);
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
InlongGroupEntity entity = groupMapper.selectByGroupIdForUpdate(groupId);
@@ -287,137 +365,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public boolean delete(String groupId, String operator) {
- LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator);
- Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-
- InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
- if (entity == null) {
- LOGGER.error("inlong group not found by groupId={}", groupId);
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
-
- // Determine whether the current status can be deleted
- GroupStatus curState = GroupStatus.forCode(entity.getStatus());
- if (GroupStatus.notAllowedTransition(curState, GroupStatus.DELETED)) {
- String errMsg = String.format("Current status=%s was not allowed to delete", curState);
- LOGGER.error(errMsg);
- throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
- }
-
- /*
- If the status allowed logic delete, all associated data can be logically deleted.
- In other status, you need to delete the related "inlong stream" first.
- When deleting a related inlong stream, you also need to check whether
- there are some related "stream source" and "stream sink"
- */
- if (GroupStatus.allowedLogicDelete(curState)) {
- streamService.logicDeleteAll(entity.getInlongGroupId(), operator);
- } else {
- int count = streamService.selectCountByGroupId(groupId);
- if (count >= 1) {
- LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count);
- throw new BusinessException(ErrorCodeEnum.GROUP_HAS_STREAM);
- }
- }
-
- // update the group after deleting related info
- entity.setIsDeleted(entity.getId());
- entity.setStatus(GroupStatus.DELETED.getCode());
- entity.setModifier(operator);
- int rowCount = groupMapper.updateByIdentifierSelective(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
- entity.getInlongGroupId(), entity.getVersion());
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
-
- // logically delete the associated extension info
- groupExtMapper.logicDeleteAllByGroupId(groupId);
-
- LOGGER.info("success to delete group and group ext property for groupId={} by user={}", groupId, operator);
- return true;
- }
-
- @Override
- public Boolean exist(String groupId) {
- Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
- LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null);
- return entity != null;
- }
-
- @Override
- public InlongGroupCountResponse countGroupByUser(String operator) {
- InlongGroupCountResponse countVO = new InlongGroupCountResponse();
- List<Map<String, Object>> statusCount = groupMapper.countGroupByUser(operator);
- for (Map<String, Object> map : statusCount) {
- int status = (Integer) map.get("status");
- long count = (Long) map.get("count");
- countVO.setTotalCount(countVO.getTotalCount() + count);
- if (status == GroupStatus.CONFIG_ING.getCode()) {
- countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
- } else if (status == GroupStatus.TO_BE_APPROVAL.getCode()) {
- countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
- } else if (status == GroupStatus.APPROVE_REJECTED.getCode()) {
- countVO.setRejectCount(countVO.getRejectCount() + count);
- }
- }
-
- LOGGER.debug("success to count inlong group for operator={}", operator);
- return countVO;
- }
-
- @Override
- public InlongGroupTopicInfo getTopic(String groupId) {
- // the group info will not null in get() method
- InlongGroupInfo groupInfo = this.get(groupId);
- InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
- InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo);
-
- // set the base params
- topicInfo.setInlongGroupId(groupId);
- String clusterTag = groupInfo.getInlongClusterTag();
- topicInfo.setInlongClusterTag(clusterTag);
-
- // assert: each MQ type has a corresponding type of cluster
- List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, groupInfo.getMqType());
- topicInfo.setClusterInfos(clusterInfos);
-
- LOGGER.debug("success to get topic for groupId={}, result={}", groupId, topicInfo);
- return topicInfo;
- }
-
- @Override
- public InlongGroupTopicInfo getBackupTopic(String groupId) {
- // backup topic info saved in the ext table
- InlongGroupExtEntity extEntity = groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG);
- if (extEntity == null || StringUtils.isBlank(extEntity.getKeyValue())) {
- LOGGER.warn("not found any backup topic for groupId={}", groupId);
- return null;
- }
-
- // the group info will not null in get() method
- InlongGroupInfo groupInfo = this.get(groupId);
- InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
- InlongGroupTopicInfo backupTopicInfo = groupOperator.getBackupTopic(groupInfo);
-
- // set the base params
- backupTopicInfo.setInlongGroupId(groupId);
- String backupClusterTag = extEntity.getKeyValue();
- backupTopicInfo.setInlongClusterTag(backupClusterTag);
-
- // set backup cluster info
- // assert: each MQ type has a corresponding type of cluster
- List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType());
- backupTopicInfo.setClusterInfos(clusterInfos);
-
- LOGGER.debug("success to get backup topic for groupId={}, result={}", groupId, backupTopicInfo);
- return backupTopicInfo;
- }
-
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public void updateAfterApprove(InlongGroupApproveRequest approveRequest, String operator) {
@@ -427,10 +374,10 @@ public class InlongGroupServiceImpl implements InlongGroupService {
// only the [TO_BE_APPROVAL] status allowed the passing operation
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
- throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
+ throw new BusinessException("inlong group not found with group id=" + groupId);
}
if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) {
- throw new WorkflowListenerException("inlong group status is [wait_approval], not allowed to approve again");
+ throw new BusinessException("inlong group status [wait_approval] not allowed to approve again");
}
// bind cluster tag and update status to [GROUP_APPROVE_PASSED]
@@ -468,6 +415,64 @@ public class InlongGroupServiceImpl implements InlongGroupService {
LOGGER.info("success to save or update inlong group ext for groupId={}", groupId);
}
+ @Override
+ public InlongGroupInfo doDeleteCheck(String groupId, String operator) {
+ InlongGroupInfo groupInfo = this.get(groupId);
+ // only the person in charges can update
+ List<String> inCharges = Arrays.asList(groupInfo.getInCharges().split(InlongConstants.COMMA));
+ if (!inCharges.contains(operator)) {
+ LOGGER.error("user [{}] has no privilege for the inlong group", operator);
+ throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
+ }
+
+ // determine whether the current status can be deleted
+ GroupStatus curState = GroupStatus.forCode(groupInfo.getStatus());
+ if (GroupStatus.notAllowedTransition(curState, GroupStatus.DELETING)) {
+ String errMsg = String.format("current group status=%s was not allowed to delete", curState);
+ LOGGER.error(errMsg);
+ throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
+ }
+
+ // If the status allowed logic delete, all associated info will be logically deleted.
+ // In other status, you need to delete the related "inlong_stream" first.
+ if (!GroupStatus.allowedLogicDelete(curState)) {
+ int count = streamService.selectCountByGroupId(groupId);
+ if (count >= 1) {
+ LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count);
+ throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_HAS_STREAM);
+ }
+ }
+
+ return groupInfo;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public Boolean delete(String groupId, String operator) {
+ LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator);
+ InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+ Preconditions.checkNotNull(entity, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+
+ entity.setIsDeleted(entity.getId());
+ entity.setStatus(GroupStatus.DELETED.getCode());
+ entity.setModifier(operator);
+ int rowCount = groupMapper.updateByIdentifierSelective(entity);
+ if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("inlong group has already updated for groupId={} curVersion={}", groupId, entity.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+
+ // logically delete the associated extension info
+ groupExtMapper.logicDeleteAllByGroupId(groupId);
+
+ if (GroupStatus.allowedLogicDelete(GroupStatus.forCode(entity.getStatus()))) {
+ streamService.logicDeleteAll(groupId, operator);
+ }
+
+ LOGGER.info("success to delete group and group ext property for groupId={} by user={}", groupId, operator);
+ return true;
+ }
+
private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
Map<String, String> extMap = new HashMap<>();
extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue()));
@@ -482,7 +487,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
case USER_DEFINED:
return createUserDefinedSortConfig(extMap);
default:
- LOGGER.warn("Unsupported sort config for sortType:{}", sortType);
+ LOGGER.warn("unsupported sort config for sortType: {}", sortType);
return null;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
index 8a4116100..4c76c62bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.StreamSourceService;
@@ -58,23 +59,26 @@ public class UpdateGroupCompleteListener implements ProcessEventListener {
log.info("begin to execute UpdateGroupCompleteListener for groupId={}, operateType={}", groupId, operateType);
// update inlong group status and other configs
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ InlongGroupRequest groupRequest = groupInfo.genRequest();
String operator = context.getOperator();
switch (operateType) {
case SUSPEND:
groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), operator);
+ groupService.update(groupRequest, operator);
break;
case RESTART:
groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), operator);
+ groupService.update(groupRequest, operator);
break;
case DELETE:
- groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), operator);
+ // delete process completed, then delete the group info
+ groupService.delete(groupId, operator);
break;
default:
log.warn("unsupported operate={} for inlong group", operateType);
break;
}
- InlongGroupInfo groupInfo = form.getGroupInfo();
- groupService.update(groupInfo.genRequest(), operator);
// if the inlong group is lightweight mode, the stream source needs to be processed.
if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java
index 2741c50d4..39f02657c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java
@@ -51,6 +51,7 @@ public class UpdateGroupFailedListener implements ProcessEventListener {
// update inlong group status and other info
String operator = context.getOperator();
+ // delete process failed, then change the group status to [CONFIG_FAILED]
groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), operator);
groupService.update(form.getGroupInfo().genRequest(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index bdbdef4d4..3d17b3972 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -43,6 +43,15 @@ public interface InlongStreamService {
*/
Integer save(InlongStreamRequest request, String operator);
+ /**
+ * Query whether the inlong stream ID exists
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return true: exists, false: does not exist
+ */
+ Boolean exist(String groupId, String streamId);
+
/**
* Query the details of the specified inlong stream
*
@@ -60,15 +69,6 @@ public interface InlongStreamService {
*/
List<InlongStreamInfo> list(String groupId);
- /**
- * Query whether the inlong stream ID exists
- *
- * @param groupId inlong group id
- * @param streamId inlong stream id
- * @return true: exists, false: does not exist
- */
- Boolean exist(String groupId, String streamId);
-
/**
* Paging query inlong stream brief info list
*
@@ -103,7 +103,10 @@ public interface InlongStreamService {
Boolean update(InlongStreamRequest request, String operator);
/**
- * Delete the specified inlong stream
+ * Delete the specified inlong stream.
+ * <p/>
+ * When deleting an inlong stream, you need to check whether there are some related
+ * stream_sources or stream_sinks
*
* @param groupId Inlong group id
* @param streamId Inlong stream id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 633d46fca..043df26df 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -126,6 +126,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
return streamEntity.getId();
}
+ @Override
+ public Boolean exist(String groupId, String streamId) {
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ return streamEntity != null;
+ }
+
@Override
public InlongStreamInfo get(String groupId, String streamId) {
LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", groupId, streamId);
@@ -183,14 +191,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
return streamList;
}
- @Override
- public Boolean exist(String groupId, String streamId) {
- Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
- return streamEntity != null;
- }
-
/**
* Query and set the extended information and data source fields of the inlong stream
*/
@@ -307,7 +307,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
// Check whether the current inlong group status supports modification
- this.checkCanUpdate(inlongGroupEntity.getStatus(), streamEntity, request);
+ this.doUpdateCheck(inlongGroupEntity.getStatus(), streamEntity, request);
CommonBeanUtils.copyProperties(request, streamEntity, true);
streamEntity.setModifier(operator);
@@ -326,8 +326,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
return true;
}
- @Transactional(rollbackFor = Throwable.class)
@Override
+ @Transactional(rollbackFor = Throwable.class)
public Boolean delete(String groupId, String streamId, String operator) {
LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
@@ -580,7 +580,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
* @param streamEntity Original inlong stream entity
* @param request New inlong stream information
*/
- private void checkCanUpdate(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
+ private void doUpdateCheck(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
if (streamEntity == null || request == null) {
return;
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 959d005e4..e3375618c 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -68,6 +68,13 @@ public class InlongGroupController {
return Response.success(groupService.save(groupRequest, operator));
}
+ @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET)
+ @ApiOperation(value = "Is the inlong group id exists")
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+ public Response<Boolean> exist(@PathVariable String groupId) {
+ return Response.success(groupService.exist(groupId));
+ }
+
@RequestMapping(value = "/group/get/{groupId}", method = RequestMethod.GET)
@ApiOperation(value = "Get inlong group")
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
@@ -75,6 +82,25 @@ public class InlongGroupController {
return Response.success(groupService.get(groupId));
}
+ @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET)
+ @ApiOperation(value = "Count inlong group status for current user")
+ public Response<InlongGroupCountResponse> countGroupByUser() {
+ String operator = LoginUserUtils.getLoginUser().getName();
+ return Response.success(groupService.countGroupByUser(operator));
+ }
+
+ @GetMapping(value = "/group/getTopic/{groupId}")
+ @ApiOperation(value = "Get topic info")
+ public Response<InlongGroupTopicInfo> getTopic(@PathVariable String groupId) {
+ return Response.success(groupService.getTopic(groupId));
+ }
+
+ @GetMapping(value = "/group/getBackupTopic/{groupId}")
+ @ApiOperation(value = "Get backup topic info")
+ public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String groupId) {
+ return Response.success(groupService.getBackupTopic(groupId));
+ }
+
@RequestMapping(value = "/group/list", method = RequestMethod.POST)
@ApiOperation(value = "List inlong groups by paginating")
public Response<PageResult<InlongGroupBriefInfo>> listBrief(@RequestBody InlongGroupPageRequest request) {
@@ -91,30 +117,22 @@ public class InlongGroupController {
return Response.success(groupService.update(groupRequest, operator));
}
- @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET)
- @ApiOperation(value = "Is the inlong group id exists")
+ @RequestMapping(value = "/group/delete/{groupId}", method = RequestMethod.DELETE)
+ @ApiOperation(value = "Delete inlong group info")
+ @OperationLog(operation = OperationType.DELETE)
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
- public Response<Boolean> exist(@PathVariable String groupId) {
- return Response.success(groupService.exist(groupId));
- }
-
- @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET)
- @ApiOperation(value = "Count inlong group status for current user")
- public Response<InlongGroupCountResponse> countGroupByUser() {
+ public Response<Boolean> delete(@PathVariable String groupId) {
String operator = LoginUserUtils.getLoginUser().getName();
- return Response.success(groupService.countGroupByUser(operator));
- }
-
- @GetMapping(value = "/group/getTopic/{groupId}")
- @ApiOperation(value = "Get topic info")
- public Response<InlongGroupTopicInfo> getTopic(@PathVariable String groupId) {
- return Response.success(groupService.getTopic(groupId));
+ return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
}
- @GetMapping(value = "/group/getBackupTopic/{groupId}")
- @ApiOperation(value = "Get backup topic info")
- public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String groupId) {
- return Response.success(groupService.getBackupTopic(groupId));
+ @RequestMapping(value = "/group/deleteAsync/{groupId}", method = RequestMethod.DELETE)
+ @ApiOperation(value = "Delete inlong group info")
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+ public Response<String> deleteAsync(@PathVariable String groupId) {
+ String operator = LoginUserUtils.getLoginUser().getName();
+ return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
}
@RequestMapping(value = "/group/startProcess/{groupId}", method = RequestMethod.POST)
@@ -141,15 +159,6 @@ public class InlongGroupController {
return Response.success(groupProcessOperation.restartProcess(groupId, operator));
}
- @RequestMapping(value = "/group/delete/{groupId}", method = RequestMethod.DELETE)
- @ApiOperation(value = "Delete inlong group info")
- @OperationLog(operation = OperationType.DELETE)
- @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
- public Response<Boolean> delete(@PathVariable String groupId) {
- String operator = LoginUserUtils.getLoginUser().getName();
- return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
- }
-
@RequestMapping(value = "/group/suspendProcessAsync/{groupId}", method = RequestMethod.POST)
@ApiOperation(value = "Suspend inlong group process")
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class)
@@ -166,15 +175,6 @@ public class InlongGroupController {
return Response.success(groupProcessOperation.restartProcessAsync(groupId, operator));
}
- @RequestMapping(value = "/group/deleteAsync/{groupId}", method = RequestMethod.DELETE)
- @ApiOperation(value = "Delete inlong group info")
- @OperationLog(operation = OperationType.DELETE)
- @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
- public Response<String> deleteAsync(@PathVariable String groupId) {
- String operator = LoginUserUtils.getLoginUser().getName();
- return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
- }
-
@PostMapping(value = "/group/reset")
@ApiOperation(value = "Reset group status when group is in CONFIG_ING|SUSPENDING|RESTARTING|DELETING")
public Response<Boolean> reset(@RequestBody @Validated InlongGroupResetRequest request) {