You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/14 03:31:33 UTC

[inlong] branch master updated: [INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 530bc4b23 [INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514)
530bc4b23 is described below

commit 530bc4b23e06a219a31c9ecea3cefad921c01db4
Author: healchow <he...@gmail.com>
AuthorDate: Mon Nov 14 11:31:28 2022 +0800

    [INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514)
---
 .../inlong/manager/common/enums/ErrorCodeEnum.java |  16 +-
 .../service/group/InlongGroupProcessService.java   |  32 +--
 .../manager/service/group/InlongGroupService.java  |  75 +++---
 .../service/group/InlongGroupServiceImpl.java      | 285 +++++++++++----------
 .../group/UpdateGroupCompleteListener.java         |  10 +-
 .../listener/group/UpdateGroupFailedListener.java  |   1 +
 .../service/stream/InlongStreamService.java        |  23 +-
 .../service/stream/InlongStreamServiceImpl.java    |  22 +-
 .../web/controller/InlongGroupController.java      |  76 +++---
 9 files changed, 282 insertions(+), 258 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 87a6e706c..f62e9e920 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -39,17 +39,17 @@ public enum ErrorCodeEnum {
     GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
     GROUP_DUPLICATE(1002, "Inlong group already exists"),
     GROUP_INFO_INCORRECT(1003, "Group info was incorrect"),
-    GROUP_SAVE_FAILED(1003, "Failed to save/update inlong group"),
-    GROUP_PERMISSION_DENIED(1004, "No permission to access this inlong group"),
-    GROUP_HAS_STREAM(1005, "There are some valid inlong stream for this inlong group"),
+    GROUP_SAVE_FAILED(1004, "Failed to save/update inlong group"),
+    GROUP_PERMISSION_DENIED(1005, "No permission to access this inlong group"),
     GROUP_UPDATE_NOT_ALLOWED(1006, "The current inlong group status does not support modification"),
     GROUP_DELETE_NOT_ALLOWED(1007, "The current inlong group status does not support deletion"),
-    GROUP_ID_UPDATE_NOT_ALLOWED(1008, "The current inlong group status does not support modifying the group id"),
-    GROUP_MIDDLEWARE_UPDATE_NOT_ALLOWED(1011,
-            "The current inlong group status does not support modifying the MQ type"),
+    GROUP_DELETE_HAS_STREAM(1008, "The inlong group contains inlong streams and is not allowed to be deleted"),
+
+    GROUP_ID_UPDATE_NOT_ALLOWED(1010, "The current status does not support modifying the inlong group id"),
+    GROUP_MIDDLEWARE_UPDATE_NOT_ALLOWED(1011, "The current status does not support modifying the MQ type"),
     GROUP_NAME_UPDATE_NOT_ALLOWED(1012, "The current inlong group status does not support modifying the name"),
     GROUP_INFO_INCONSISTENT(1013, "The inlong group info is inconsistent, please contact the administrator"),
-    GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support lightweight, standard"),
+    GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support lightweight or standard"),
 
     OPT_NOT_ALLOWED_BY_STATUS(1021, "InlongGroup status %s was not allowed to add/update/delete related info"),
 
@@ -74,7 +74,7 @@ public enum ErrorCodeEnum {
     STREAM_EXT_SAVE_FAILED(1207, "Failed to save/update inlong stream extension information"),
     STREAM_FIELD_SAVE_FAILED(1208, "Failed to save/update inlong stream field"),
     STREAM_DELETE_HAS_SOURCE(1209, "The inlong stream contains source info and is not allowed to be deleted"),
-    STREAM_DELETE_HAS_SINK(1210, "The inlong stream contains data sink info and is not allowed to be deleted"),
+    STREAM_DELETE_HAS_SINK(1210, "The inlong stream contains sink info and is not allowed to be deleted"),
 
     SOURCE_TYPE_IS_NULL(1300, "Source type is null"),
     SOURCE_TYPE_NOT_SUPPORT(1301, "Source type '%s' not support"),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index de044f0aa..0109cdec1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -185,36 +185,34 @@ public class InlongGroupProcessService {
      * @return inlong group id
      */
     public String deleteProcessAsync(String groupId, String operator) {
-        LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator);
+        LOGGER.info("begin to delete group asynchronously for groupId={} by user={}", groupId, operator);
         EXECUTOR_SERVICE.execute(() -> {
             try {
                 invokeDeleteProcess(groupId, operator);
-            } catch (Exception ex) {
-                LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
-                throw ex;
+            } catch (Exception e) {
+                LOGGER.error(String.format("failed to async delete group for groupId=%s by %s", groupId, operator), e);
+                throw e;
             }
-            groupService.delete(groupId, operator);
         });
 
-        LOGGER.info("success to delete process asynchronously for groupId={} by operator={}", groupId, operator);
+        LOGGER.info("success to delete group asynchronously for groupId={} by user={}", groupId, operator);
         return groupId;
     }
 
     /**
-     * Delete InlongGroup logically and delete related resource in an asynchronous way.
+     * Delete InlongGroup logically and delete related resource in a synchronous way.
      */
-    public boolean deleteProcess(String groupId, String operator) {
-        LOGGER.info("begin to delete process for groupId={} by operator={}", groupId, operator);
+    public Boolean deleteProcess(String groupId, String operator) {
+        LOGGER.info("begin to delete group for groupId={} by user={}", groupId, operator);
         try {
             invokeDeleteProcess(groupId, operator);
-        } catch (Exception ex) {
-            LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
-            throw ex;
+        } catch (Exception e) {
+            LOGGER.error(String.format("failed to delete group for groupId=%s by user=%s", groupId, operator), e);
+            throw e;
         }
 
-        boolean result = groupService.delete(groupId, operator);
-        LOGGER.info("success to delete process for groupId={} by operator={}", groupId, operator);
-        return result;
+        LOGGER.info("success to delete group for groupId={} by user={}", groupId, operator);
+        return true;
     }
 
     /**
@@ -287,7 +285,9 @@ public class InlongGroupProcessService {
     }
 
     private void invokeDeleteProcess(String groupId, String operator) {
-        InlongGroupInfo groupInfo = groupService.get(groupId);
+        // check can be deleted
+        InlongGroupInfo groupInfo = groupService.doDeleteCheck(groupId, operator);
+        // start to delete group process
         GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.DELETE);
         workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 3c07faa28..58acbfb27 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -45,6 +45,14 @@ public interface InlongGroupService {
      */
     String save(InlongGroupRequest groupInfo, String operator);
 
+    /**
+     * Query whether the specified group id exists
+     *
+     * @param groupId the group id to be queried
+     * @return does it exist
+     */
+    Boolean exist(String groupId);
+
     /**
      * Get inlong group info based on inlong group id
      *
@@ -53,6 +61,31 @@ public interface InlongGroupService {
      */
     InlongGroupInfo get(String groupId);
 
+    /**
+     * Query the group information of each status of the current user
+     *
+     * @param operator name of operator
+     * @return inlong group status statistics
+     */
+    InlongGroupCountResponse countGroupByUser(String operator);
+
+    /**
+     * According to the group id, query the topic to which it belongs
+     *
+     * @param groupId Inlong group id
+     * @return Topic information
+     * @apiNote TubeMQ corresponds to the group, only 1 topic
+     */
+    InlongGroupTopicInfo getTopic(String groupId);
+
+    /**
+     * According to the group id, query the backup topic to which it belongs
+     *
+     * @param groupId inlong group id
+     * @return backup topic info
+     */
+    InlongGroupTopicInfo getBackupTopic(String groupId);
+
     /**
      * Paging query inlong group brief info list
      *
@@ -79,48 +112,26 @@ public interface InlongGroupService {
      * @param operator name of operator
      * @return whether succeed
      */
-    boolean updateStatus(String groupId, Integer status, String operator);
+    Boolean updateStatus(String groupId, Integer status, String operator);
 
     /**
-     * Delete the group information of the specified group id
+     * Check whether deletion is supported for the specified group.
      *
-     * @param groupId The group id that needs to be deleted
+     * @param groupId inlong group id
      * @param operator name of operator
-     * @return whether succeed
-     */
-    boolean delete(String groupId, String operator);
-
-    /**
-     * Query whether the specified group id exists
-     *
-     * @param groupId the group id to be queried
-     * @return does it exist
+     * @return inlong group info
      */
-    Boolean exist(String groupId);
+    InlongGroupInfo doDeleteCheck(String groupId, String operator);
 
     /**
-     * Query the group information of each status of the current user
+     * Delete the group information of the specified group id
      *
+     * @param groupId The group id that needs to be deleted
      * @param operator name of operator
-     * @return inlong group status statistics
-     */
-    InlongGroupCountResponse countGroupByUser(String operator);
-
-    /**
-     * According to the group id, query the topic to which it belongs
-     *
-     * @param groupId inlong group id
-     * @return topic info
-     */
-    InlongGroupTopicInfo getTopic(String groupId);
-
-    /**
-     * According to the group id, query the backup topic to which it belongs
-     *
-     * @param groupId inlong group id
-     * @return backup topic info
+     * @return whether succeed
+     * @apiNote Before invoking this delete method, you must
      */
-    InlongGroupTopicInfo getBackupTopic(String groupId);
+    Boolean delete(String groupId, String operator);
 
     /**
      * Save the group modified when the approval is passed
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 6a5839065..9c143fd67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -118,7 +117,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
      * @param request request of updated
      * @param operator current operator
      */
-    private static void checkGroupCanUpdate(InlongGroupEntity entity, InlongGroupRequest request, String operator) {
+    private static void doUpdateCheck(InlongGroupEntity entity, InlongGroupRequest request, String operator) {
         if (entity == null || request == null) {
             return;
         }
@@ -146,8 +145,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         }
     }
 
-    @Transactional(rollbackFor = Throwable.class)
     @Override
+    @Transactional(rollbackFor = Throwable.class)
     public String save(InlongGroupRequest request, String operator) {
         LOGGER.debug("begin to save inlong group={} by user={}", request, operator);
         Preconditions.checkNotNull(request, "inlong group request cannot be empty");
@@ -158,6 +157,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
             LOGGER.error("groupId {} has already exists", groupId);
             throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
         }
+
         request.setEnableZookeeper(enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
         InlongGroupOperator instance = groupOperatorFactory.getInstance(request.getMqType());
         groupId = instance.saveOpt(request, operator);
@@ -169,6 +169,14 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         return groupId;
     }
 
+    @Override
+    public Boolean exist(String groupId) {
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+        LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null);
+        return entity != null;
+    }
+
     @Override
     public InlongGroupInfo get(String groupId) {
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
@@ -192,6 +200,75 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         return groupInfo;
     }
 
+    @Override
+    public InlongGroupCountResponse countGroupByUser(String operator) {
+        InlongGroupCountResponse countVO = new InlongGroupCountResponse();
+        List<Map<String, Object>> statusCount = groupMapper.countGroupByUser(operator);
+        for (Map<String, Object> map : statusCount) {
+            int status = (Integer) map.get("status");
+            long count = (Long) map.get("count");
+            countVO.setTotalCount(countVO.getTotalCount() + count);
+            if (status == GroupStatus.CONFIG_ING.getCode()) {
+                countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
+            } else if (status == GroupStatus.TO_BE_APPROVAL.getCode()) {
+                countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
+            } else if (status == GroupStatus.APPROVE_REJECTED.getCode()) {
+                countVO.setRejectCount(countVO.getRejectCount() + count);
+            }
+        }
+
+        LOGGER.debug("success to count inlong group for operator={}", operator);
+        return countVO;
+    }
+
+    @Override
+    public InlongGroupTopicInfo getTopic(String groupId) {
+        // the group info will not null in get() method
+        InlongGroupInfo groupInfo = this.get(groupId);
+        InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
+        InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo);
+
+        // set the base params
+        topicInfo.setInlongGroupId(groupId);
+        String clusterTag = groupInfo.getInlongClusterTag();
+        topicInfo.setInlongClusterTag(clusterTag);
+
+        // assert: each MQ type has a corresponding type of cluster
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, groupInfo.getMqType());
+        topicInfo.setClusterInfos(clusterInfos);
+
+        LOGGER.debug("success to get topic for groupId={}, result={}", groupId, topicInfo);
+        return topicInfo;
+    }
+
+    @Override
+    public InlongGroupTopicInfo getBackupTopic(String groupId) {
+        // backup topic info saved in the ext table
+        InlongGroupExtEntity extEntity = groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG);
+        if (extEntity == null || StringUtils.isBlank(extEntity.getKeyValue())) {
+            LOGGER.warn("not found any backup topic for groupId={}", groupId);
+            return null;
+        }
+
+        // the group info will not null in get() method
+        InlongGroupInfo groupInfo = this.get(groupId);
+        InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
+        InlongGroupTopicInfo backupTopicInfo = groupOperator.getBackupTopic(groupInfo);
+
+        // set the base params
+        backupTopicInfo.setInlongGroupId(groupId);
+        String backupClusterTag = extEntity.getKeyValue();
+        backupTopicInfo.setInlongClusterTag(backupClusterTag);
+
+        // set backup cluster info
+        // assert: each MQ type has a corresponding type of cluster
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType());
+        backupTopicInfo.setClusterInfos(clusterInfos);
+
+        LOGGER.debug("success to get backup topic for groupId={}, result={}", groupId, backupTopicInfo);
+        return backupTopicInfo;
+    }
+
     @Override
     public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request) {
         if (request.getPageSize() > MAX_PAGE_SIZE) {
@@ -231,7 +308,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
+    @Transactional(rollbackFor = Throwable.class,
+            isolation = Isolation.REPEATABLE_READ,
             propagation = Propagation.REQUIRES_NEW)
     public String update(InlongGroupRequest request, String operator) {
         LOGGER.debug("begin to update inlong group={} by user={}", request, operator);
@@ -248,7 +326,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
         // check whether the current status can be modified
-        checkGroupCanUpdate(entity, request, operator);
+        doUpdateCheck(entity, request, operator);
 
         request.setEnableZookeeper(enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
         InlongGroupOperator instance = groupOperatorFactory.getInstance(request.getMqType());
@@ -264,7 +342,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Override
     @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
             propagation = Propagation.REQUIRES_NEW)
-    public boolean updateStatus(String groupId, Integer status, String operator) {
+    public Boolean updateStatus(String groupId, Integer status, String operator) {
         LOGGER.info("begin to update group status to [{}] for groupId={} by user={}", status, groupId, operator);
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         InlongGroupEntity entity = groupMapper.selectByGroupIdForUpdate(groupId);
@@ -287,137 +365,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public boolean delete(String groupId, String operator) {
-        LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator);
-        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-
-        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
-        if (entity == null) {
-            LOGGER.error("inlong group not found by groupId={}", groupId);
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-
-        // Determine whether the current status can be deleted
-        GroupStatus curState = GroupStatus.forCode(entity.getStatus());
-        if (GroupStatus.notAllowedTransition(curState, GroupStatus.DELETED)) {
-            String errMsg = String.format("Current status=%s was not allowed to delete", curState);
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
-        }
-
-        /*
-         If the status allowed logic delete, all associated data can be logically deleted.
-         In other status, you need to delete the related "inlong stream" first.
-         When deleting a related inlong stream, you also need to check whether
-         there are some related "stream source" and "stream sink"
-         */
-        if (GroupStatus.allowedLogicDelete(curState)) {
-            streamService.logicDeleteAll(entity.getInlongGroupId(), operator);
-        } else {
-            int count = streamService.selectCountByGroupId(groupId);
-            if (count >= 1) {
-                LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count);
-                throw new BusinessException(ErrorCodeEnum.GROUP_HAS_STREAM);
-            }
-        }
-
-        // update the group after deleting related info
-        entity.setIsDeleted(entity.getId());
-        entity.setStatus(GroupStatus.DELETED.getCode());
-        entity.setModifier(operator);
-        int rowCount = groupMapper.updateByIdentifierSelective(entity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
-                    entity.getInlongGroupId(), entity.getVersion());
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-
-        // logically delete the associated extension info
-        groupExtMapper.logicDeleteAllByGroupId(groupId);
-
-        LOGGER.info("success to delete group and group ext property for groupId={} by user={}", groupId, operator);
-        return true;
-    }
-
-    @Override
-    public Boolean exist(String groupId) {
-        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
-        LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null);
-        return entity != null;
-    }
-
-    @Override
-    public InlongGroupCountResponse countGroupByUser(String operator) {
-        InlongGroupCountResponse countVO = new InlongGroupCountResponse();
-        List<Map<String, Object>> statusCount = groupMapper.countGroupByUser(operator);
-        for (Map<String, Object> map : statusCount) {
-            int status = (Integer) map.get("status");
-            long count = (Long) map.get("count");
-            countVO.setTotalCount(countVO.getTotalCount() + count);
-            if (status == GroupStatus.CONFIG_ING.getCode()) {
-                countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
-            } else if (status == GroupStatus.TO_BE_APPROVAL.getCode()) {
-                countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
-            } else if (status == GroupStatus.APPROVE_REJECTED.getCode()) {
-                countVO.setRejectCount(countVO.getRejectCount() + count);
-            }
-        }
-
-        LOGGER.debug("success to count inlong group for operator={}", operator);
-        return countVO;
-    }
-
-    @Override
-    public InlongGroupTopicInfo getTopic(String groupId) {
-        // the group info will not null in get() method
-        InlongGroupInfo groupInfo = this.get(groupId);
-        InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
-        InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo);
-
-        // set the base params
-        topicInfo.setInlongGroupId(groupId);
-        String clusterTag = groupInfo.getInlongClusterTag();
-        topicInfo.setInlongClusterTag(clusterTag);
-
-        // assert: each MQ type has a corresponding type of cluster
-        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, groupInfo.getMqType());
-        topicInfo.setClusterInfos(clusterInfos);
-
-        LOGGER.debug("success to get topic for groupId={}, result={}", groupId, topicInfo);
-        return topicInfo;
-    }
-
-    @Override
-    public InlongGroupTopicInfo getBackupTopic(String groupId) {
-        // backup topic info saved in the ext table
-        InlongGroupExtEntity extEntity = groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG);
-        if (extEntity == null || StringUtils.isBlank(extEntity.getKeyValue())) {
-            LOGGER.warn("not found any backup topic for groupId={}", groupId);
-            return null;
-        }
-
-        // the group info will not null in get() method
-        InlongGroupInfo groupInfo = this.get(groupId);
-        InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType());
-        InlongGroupTopicInfo backupTopicInfo = groupOperator.getBackupTopic(groupInfo);
-
-        // set the base params
-        backupTopicInfo.setInlongGroupId(groupId);
-        String backupClusterTag = extEntity.getKeyValue();
-        backupTopicInfo.setInlongClusterTag(backupClusterTag);
-
-        // set backup cluster info
-        // assert: each MQ type has a corresponding type of cluster
-        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType());
-        backupTopicInfo.setClusterInfos(clusterInfos);
-
-        LOGGER.debug("success to get backup topic for groupId={}, result={}", groupId, backupTopicInfo);
-        return backupTopicInfo;
-    }
-
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public void updateAfterApprove(InlongGroupApproveRequest approveRequest, String operator) {
@@ -427,10 +374,10 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         // only the [TO_BE_APPROVAL] status allowed the passing operation
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
-            throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
+            throw new BusinessException("inlong group not found with group id=" + groupId);
         }
         if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) {
-            throw new WorkflowListenerException("inlong group status is [wait_approval], not allowed to approve again");
+            throw new BusinessException("inlong group status [wait_approval] not allowed to approve again");
         }
 
         // bind cluster tag and update status to [GROUP_APPROVE_PASSED]
@@ -468,6 +415,64 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         LOGGER.info("success to save or update inlong group ext for groupId={}", groupId);
     }
 
+    @Override
+    public InlongGroupInfo doDeleteCheck(String groupId, String operator) {
+        InlongGroupInfo groupInfo = this.get(groupId);
+        // only the person in charges can update
+        List<String> inCharges = Arrays.asList(groupInfo.getInCharges().split(InlongConstants.COMMA));
+        if (!inCharges.contains(operator)) {
+            LOGGER.error("user [{}] has no privilege for the inlong group", operator);
+            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
+        }
+
+        // determine whether the current status can be deleted
+        GroupStatus curState = GroupStatus.forCode(groupInfo.getStatus());
+        if (GroupStatus.notAllowedTransition(curState, GroupStatus.DELETING)) {
+            String errMsg = String.format("current group status=%s was not allowed to delete", curState);
+            LOGGER.error(errMsg);
+            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
+        }
+
+        // If the status allowed logic delete, all associated info will be logically deleted.
+        // In other status, you need to delete the related "inlong_stream" first.
+        if (!GroupStatus.allowedLogicDelete(curState)) {
+            int count = streamService.selectCountByGroupId(groupId);
+            if (count >= 1) {
+                LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count);
+                throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_HAS_STREAM);
+            }
+        }
+
+        return groupInfo;
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Boolean delete(String groupId, String operator) {
+        LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator);
+        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+
+        entity.setIsDeleted(entity.getId());
+        entity.setStatus(GroupStatus.DELETED.getCode());
+        entity.setModifier(operator);
+        int rowCount = groupMapper.updateByIdentifierSelective(entity);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("inlong group has already updated for groupId={} curVersion={}", groupId, entity.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+
+        // logically delete the associated extension info
+        groupExtMapper.logicDeleteAllByGroupId(groupId);
+
+        if (GroupStatus.allowedLogicDelete(GroupStatus.forCode(entity.getStatus()))) {
+            streamService.logicDeleteAll(groupId, operator);
+        }
+
+        LOGGER.info("success to delete group and group ext property for groupId={} by user={}", groupId, operator);
+        return true;
+    }
+
     private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
         Map<String, String> extMap = new HashMap<>();
         extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue()));
@@ -482,7 +487,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
             case USER_DEFINED:
                 return createUserDefinedSortConfig(extMap);
             default:
-                LOGGER.warn("Unsupported sort config for sortType:{}", sortType);
+                LOGGER.warn("unsupported sort config for sortType: {}", sortType);
                 return null;
         }
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
index 8a4116100..4c76c62bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
@@ -58,23 +59,26 @@ public class UpdateGroupCompleteListener implements ProcessEventListener {
         log.info("begin to execute UpdateGroupCompleteListener for groupId={}, operateType={}", groupId, operateType);
 
         // update inlong group status and other configs
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        InlongGroupRequest groupRequest = groupInfo.genRequest();
         String operator = context.getOperator();
         switch (operateType) {
             case SUSPEND:
                 groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), operator);
+                groupService.update(groupRequest, operator);
                 break;
             case RESTART:
                 groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), operator);
+                groupService.update(groupRequest, operator);
                 break;
             case DELETE:
-                groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), operator);
+                // delete process completed, then delete the group info
+                groupService.delete(groupId, operator);
                 break;
             default:
                 log.warn("unsupported operate={} for inlong group", operateType);
                 break;
         }
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        groupService.update(groupInfo.genRequest(), operator);
 
         // if the inlong group is lightweight mode, the stream source needs to be processed.
         if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java
index 2741c50d4..39f02657c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java
@@ -51,6 +51,7 @@ public class UpdateGroupFailedListener implements ProcessEventListener {
 
         // update inlong group status and other info
         String operator = context.getOperator();
+        // delete process failed, then change the group status to [CONFIG_FAILED]
         groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), operator);
         groupService.update(form.getGroupInfo().genRequest(), operator);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index bdbdef4d4..3d17b3972 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -43,6 +43,15 @@ public interface InlongStreamService {
      */
     Integer save(InlongStreamRequest request, String operator);
 
+    /**
+     * Query whether the inlong stream ID exists
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return true: exists, false: does not exist
+     */
+    Boolean exist(String groupId, String streamId);
+
     /**
      * Query the details of the specified inlong stream
      *
@@ -60,15 +69,6 @@ public interface InlongStreamService {
      */
     List<InlongStreamInfo> list(String groupId);
 
-    /**
-     * Query whether the inlong stream ID exists
-     *
-     * @param groupId inlong group id
-     * @param streamId inlong stream id
-     * @return true: exists, false: does not exist
-     */
-    Boolean exist(String groupId, String streamId);
-
     /**
      * Paging query inlong stream brief info list
      *
@@ -103,7 +103,10 @@ public interface InlongStreamService {
     Boolean update(InlongStreamRequest request, String operator);
 
     /**
-     * Delete the specified inlong stream
+     * Delete the specified inlong stream.
+     * <p/>
+     * When deleting an inlong stream, you need to check whether there are some related
+     * stream_sources or stream_sinks
      *
      * @param groupId Inlong group id
      * @param streamId Inlong stream id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 633d46fca..043df26df 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -126,6 +126,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         return streamEntity.getId();
     }
 
+    @Override
+    public Boolean exist(String groupId, String streamId) {
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        return streamEntity != null;
+    }
+
     @Override
     public InlongStreamInfo get(String groupId, String streamId) {
         LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", groupId, streamId);
@@ -183,14 +191,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         return streamList;
     }
 
-    @Override
-    public Boolean exist(String groupId, String streamId) {
-        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-        Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
-        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
-        return streamEntity != null;
-    }
-
     /**
      * Query and set the extended information and data source fields of the inlong stream
      */
@@ -307,7 +307,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
         // Check whether the current inlong group status supports modification
-        this.checkCanUpdate(inlongGroupEntity.getStatus(), streamEntity, request);
+        this.doUpdateCheck(inlongGroupEntity.getStatus(), streamEntity, request);
 
         CommonBeanUtils.copyProperties(request, streamEntity, true);
         streamEntity.setModifier(operator);
@@ -326,8 +326,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         return true;
     }
 
-    @Transactional(rollbackFor = Throwable.class)
     @Override
+    @Transactional(rollbackFor = Throwable.class)
     public Boolean delete(String groupId, String streamId, String operator) {
         LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
@@ -580,7 +580,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
      * @param streamEntity Original inlong stream entity
      * @param request New inlong stream information
      */
-    private void checkCanUpdate(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
+    private void doUpdateCheck(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
         if (streamEntity == null || request == null) {
             return;
         }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 959d005e4..e3375618c 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -68,6 +68,13 @@ public class InlongGroupController {
         return Response.success(groupService.save(groupRequest, operator));
     }
 
+    @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET)
+    @ApiOperation(value = "Is the inlong group id exists")
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+    public Response<Boolean> exist(@PathVariable String groupId) {
+        return Response.success(groupService.exist(groupId));
+    }
+
     @RequestMapping(value = "/group/get/{groupId}", method = RequestMethod.GET)
     @ApiOperation(value = "Get inlong group")
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
@@ -75,6 +82,25 @@ public class InlongGroupController {
         return Response.success(groupService.get(groupId));
     }
 
+    @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET)
+    @ApiOperation(value = "Count inlong group status for current user")
+    public Response<InlongGroupCountResponse> countGroupByUser() {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return Response.success(groupService.countGroupByUser(operator));
+    }
+
+    @GetMapping(value = "/group/getTopic/{groupId}")
+    @ApiOperation(value = "Get topic info")
+    public Response<InlongGroupTopicInfo> getTopic(@PathVariable String groupId) {
+        return Response.success(groupService.getTopic(groupId));
+    }
+
+    @GetMapping(value = "/group/getBackupTopic/{groupId}")
+    @ApiOperation(value = "Get backup topic info")
+    public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String groupId) {
+        return Response.success(groupService.getBackupTopic(groupId));
+    }
+
     @RequestMapping(value = "/group/list", method = RequestMethod.POST)
     @ApiOperation(value = "List inlong groups by paginating")
     public Response<PageResult<InlongGroupBriefInfo>> listBrief(@RequestBody InlongGroupPageRequest request) {
@@ -91,30 +117,22 @@ public class InlongGroupController {
         return Response.success(groupService.update(groupRequest, operator));
     }
 
-    @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET)
-    @ApiOperation(value = "Is the inlong group id exists")
+    @RequestMapping(value = "/group/delete/{groupId}", method = RequestMethod.DELETE)
+    @ApiOperation(value = "Delete inlong group info")
+    @OperationLog(operation = OperationType.DELETE)
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
-    public Response<Boolean> exist(@PathVariable String groupId) {
-        return Response.success(groupService.exist(groupId));
-    }
-
-    @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET)
-    @ApiOperation(value = "Count inlong group status for current user")
-    public Response<InlongGroupCountResponse> countGroupByUser() {
+    public Response<Boolean> delete(@PathVariable String groupId) {
         String operator = LoginUserUtils.getLoginUser().getName();
-        return Response.success(groupService.countGroupByUser(operator));
-    }
-
-    @GetMapping(value = "/group/getTopic/{groupId}")
-    @ApiOperation(value = "Get topic info")
-    public Response<InlongGroupTopicInfo> getTopic(@PathVariable String groupId) {
-        return Response.success(groupService.getTopic(groupId));
+        return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
     }
 
-    @GetMapping(value = "/group/getBackupTopic/{groupId}")
-    @ApiOperation(value = "Get backup topic info")
-    public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String groupId) {
-        return Response.success(groupService.getBackupTopic(groupId));
+    @RequestMapping(value = "/group/deleteAsync/{groupId}", method = RequestMethod.DELETE)
+    @ApiOperation(value = "Delete inlong group info")
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
+    public Response<String> deleteAsync(@PathVariable String groupId) {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
     }
 
     @RequestMapping(value = "/group/startProcess/{groupId}", method = RequestMethod.POST)
@@ -141,15 +159,6 @@ public class InlongGroupController {
         return Response.success(groupProcessOperation.restartProcess(groupId, operator));
     }
 
-    @RequestMapping(value = "/group/delete/{groupId}", method = RequestMethod.DELETE)
-    @ApiOperation(value = "Delete inlong group info")
-    @OperationLog(operation = OperationType.DELETE)
-    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
-    public Response<Boolean> delete(@PathVariable String groupId) {
-        String operator = LoginUserUtils.getLoginUser().getName();
-        return Response.success(groupProcessOperation.deleteProcess(groupId, operator));
-    }
-
     @RequestMapping(value = "/group/suspendProcessAsync/{groupId}", method = RequestMethod.POST)
     @ApiOperation(value = "Suspend inlong group process")
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class)
@@ -166,15 +175,6 @@ public class InlongGroupController {
         return Response.success(groupProcessOperation.restartProcessAsync(groupId, operator));
     }
 
-    @RequestMapping(value = "/group/deleteAsync/{groupId}", method = RequestMethod.DELETE)
-    @ApiOperation(value = "Delete inlong group info")
-    @OperationLog(operation = OperationType.DELETE)
-    @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
-    public Response<String> deleteAsync(@PathVariable String groupId) {
-        String operator = LoginUserUtils.getLoginUser().getName();
-        return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
-    }
-
     @PostMapping(value = "/group/reset")
     @ApiOperation(value = "Reset group status when group is in CONFIG_ING|SUSPENDING|RESTARTING|DELETING")
     public Response<Boolean> reset(@RequestBody @Validated InlongGroupResetRequest request) {