You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/02 08:54:13 UTC

[incubator-inlong] branch master updated: [INLONG-4486][TubeMQ] Adjust the parameter requirements of group consume delete APIs (#4495)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 102df00e7 [INLONG-4486][TubeMQ] Adjust the parameter requirements of group consume delete APIs (#4495)
102df00e7 is described below

commit 102df00e75ff040b944847aefcdd7f91b4b64000
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Jun 2 16:54:08 2022 +0800

    [INLONG-4486][TubeMQ] Adjust the parameter requirements of group consume delete APIs (#4495)
---
 .../master/metamanage/DefaultMetaDataService.java  |  23 +-
 .../server/master/metamanage/MetaDataService.java  |  34 ++-
 .../metastore/dao/mapper/ConsumeCtrlMapper.java    |   4 +
 .../metastore/dao/mapper/MetaConfigMapper.java     |  26 ++-
 .../metastore/impl/AbsConsumeCtrlMapperImpl.java   |  62 +++++
 .../metastore/impl/AbsMetaConfigMapperImpl.java    |  21 +-
 .../web/handler/WebAdminGroupCtrlHandler.java      | 251 +++++++++++++++++----
 .../web/handler/WebGroupConsumeCtrlHandler.java    |  38 +++-
 8 files changed, 399 insertions(+), 60 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index 4347f77bc..f2c707280 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -29,7 +29,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
 import org.apache.inlong.tubemq.corebase.TokenConstants;
@@ -1065,16 +1064,38 @@ public class DefaultMetaDataService implements MetaDataService {
         return result.isSuccess();
     }
 
+    @Override
+    public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName,
+                                                                String topicName) {
+        return metaConfigMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+    }
+
     @Override
     public List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName) {
         return metaConfigMapper.getConsumeCtrlByTopic(topicName);
     }
 
+    @Override
+    public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet) {
+        return metaConfigMapper.getConsumeCtrlByTopic(topicSet);
+    }
+
     @Override
     public Set<String> getDisableTopicByGroupName(String groupName) {
         return metaConfigMapper.getDisableTopicByGroupName(groupName);
     }
 
+    @Override
+    public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
+        return metaConfigMapper.getConsumeCtrlByGroupName(groupName);
+    }
+
+    @Override
+    public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(
+            Set<String> groupSet) {
+        return metaConfigMapper.getConsumeCtrlByGroupName(groupSet);
+    }
+
     @Override
     public Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
             Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index a38bda3ce..e6e382a5b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -21,7 +21,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.server.Server;
 import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
@@ -686,6 +685,15 @@ public interface MetaDataService extends Server {
                                String groupName, String topicName,
                                StringBuilder strBuff, ProcessResult result);
 
+    /**
+     * Get consume control records by group name
+     *
+     * @param groupName  the queried group name
+     * @param topicName  the queried topic name
+     * @return  the consume control record list
+     */
+    GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
+
     /**
      * Get all group consume control record for a specific topic
      *
@@ -694,6 +702,14 @@ public interface MetaDataService extends Server {
      */
     List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName);
 
+    /**
+     * Get all group consume control records for the specific topic set
+     *
+     * @param topicSet  the queried topic name set
+     * @return group consume control list
+     */
+    Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet);
+
     /**
      * Get all disable consumed topic for a specific group
      *
@@ -702,6 +718,22 @@ public interface MetaDataService extends Server {
      */
     Set<String> getDisableTopicByGroupName(String groupName);
 
+    /**
+     * Get consume control records by group name
+     *
+     * @param groupName  the queried group name
+     * @return  the consume control record list
+     */
+    List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
+
+    /**
+     * Get consume control records by group name set
+     *
+     * @param groupSet  the queried group name set
+     * @return  the consume control record map
+     */
+    Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(Set<String> groupSet);
+
     /**
      * Get group consume control configure for topic & group set
      *
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
index 971e0885a..299d02b58 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
@@ -44,8 +44,12 @@ public interface ConsumeCtrlMapper extends AbstractMapper {
 
     List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName);
 
+    Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopicName(Set<String> topicSet);
+
     List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
 
+    Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(Set<String> groupSet);
+
     GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
 
     Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index 0b3b4cc83..0ce5456b7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -159,7 +159,7 @@ public interface MetaConfigMapper extends KeepAliveService {
      * Add or Update topic control configure info
      *
      * @param isAddOp  whether add operation
-     * @param entity   the topic control info entity will be add
+     * @param entity   the topic control info entity will be added
      * @param strBuff  the print info string buffer
      * @param result   the process result return
      * @return true if success otherwise false
@@ -517,6 +517,14 @@ public interface MetaConfigMapper extends KeepAliveService {
      */
     List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName);
 
+    /**
+     * Get all group consume control record for the specific topic set
+     *
+     * @param topicSet  the queried topic name set
+     * @return group consume control list
+     */
+    Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet);
+
     /**
      * Get consume control record
      *
@@ -534,6 +542,22 @@ public interface MetaConfigMapper extends KeepAliveService {
      */
     Set<String> getDisableTopicByGroupName(String groupName);
 
+    /**
+     * Get consume control records by group name
+     *
+     * @param groupName  the queried group name
+     * @return  the consume control record list
+     */
+    List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
+
+    /**
+     * Get consume control records by group name set
+     *
+     * @param groupSet  the queried group name set
+     * @return  the consume control record list
+     */
+    Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(Set<String> groupSet);
+
     /**
      * Get group consume control configure for topic & group set
      *
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
index dc8703b82..1dff57566 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
@@ -188,6 +188,37 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
         return result;
     }
 
+    @Override
+    public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopicName(
+            Set<String> topicSet) {
+        if (topicSet == null || topicSet.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        GroupConsumeCtrlEntity entity;
+        ConcurrentHashSet<String> keySet;
+        List<GroupConsumeCtrlEntity> itemRet;
+        Map<String, List<GroupConsumeCtrlEntity>> result = new HashMap<>();
+        for (String topicName : topicSet) {
+            keySet = topic2RecordCache.get(topicName);
+            if (keySet == null || keySet.isEmpty()) {
+                result.put(topicName, Collections.emptyList());
+                continue;
+            }
+            itemRet = new ArrayList<>();
+            for (String recordKey : keySet) {
+                if (recordKey == null) {
+                    continue;
+                }
+                entity = consumeCtrlCache.get(recordKey);
+                if (entity != null) {
+                    itemRet.add(entity);
+                }
+            }
+            result.put(topicName, itemRet);
+        }
+        return result;
+    }
+
     @Override
     public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
         ConcurrentHashSet<String> keySet =
@@ -206,6 +237,37 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
         return result;
     }
 
+    @Override
+    public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(
+            Set<String> groupSet) {
+        if (groupSet == null || groupSet.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        GroupConsumeCtrlEntity entity;
+        ConcurrentHashSet<String> keySet;
+        List<GroupConsumeCtrlEntity> itemRet;
+        Map<String, List<GroupConsumeCtrlEntity>> result = new HashMap<>();
+        for (String groupName : groupSet) {
+            keySet = group2RecordCache.get(groupName);
+            if (keySet == null || keySet.isEmpty()) {
+                result.put(groupName, Collections.emptyList());
+                continue;
+            }
+            itemRet = new ArrayList<>();
+            for (String recordKey : keySet) {
+                if (recordKey == null) {
+                    continue;
+                }
+                entity = consumeCtrlCache.get(recordKey);
+                if (entity != null) {
+                    itemRet.add(entity);
+                }
+            }
+            result.put(groupName, itemRet);
+        }
+        return result;
+    }
+
     @Override
     public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
             String groupName, String topicName) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 90e14b39b..d2d39a033 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -26,12 +26,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.codec.binary.StringUtils;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
 import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
 import org.apache.inlong.tubemq.server.common.utils.RowLock;
@@ -1230,6 +1230,11 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
         return consumeCtrlMapper.getConsumeCtrlByTopicName(topicName);
     }
 
+    @Override
+    public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByTopic(Set<String> topicSet) {
+        return consumeCtrlMapper.getConsumeCtrlByTopicName(topicSet);
+    }
+
     @Override
     public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName) {
         return consumeCtrlMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
@@ -1244,13 +1249,25 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
             return disTopicSet;
         }
         for (GroupConsumeCtrlEntity ctrlEntity : qryResult) {
-            if (ctrlEntity != null && !ctrlEntity.isEnableConsume()) {
+            if (ctrlEntity != null
+                    && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_DISABLE) {
                 disTopicSet.add(ctrlEntity.getTopicName());
             }
         }
         return disTopicSet;
     }
 
+    @Override
+    public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
+        return consumeCtrlMapper.getConsumeCtrlByGroupName(groupName);
+    }
+
+    @Override
+    public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupName(
+            Set<String> groupSet) {
+        return consumeCtrlMapper.getConsumeCtrlByGroupName(groupSet);
+    }
+
     @Override
     public Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
             Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 6187f9af9..e67688c3b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -27,8 +27,10 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
 import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
 import org.apache.inlong.tubemq.server.master.TMaster;
+import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
@@ -392,11 +394,21 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
         }
         Set<String> topicNameSet = (Set<String>) result.getRetData();
         // add black list records
+        GroupConsumeCtrlEntity ctrlEntity;
         List<GroupProcessResult> retInfoList = new ArrayList<>();
         for (String groupName : groupNameSet) {
             for (String topicName : topicNameSet) {
+                ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                        groupName, topicName);
+                if (ctrlEntity != null
+                        && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_DISABLE) {
+                    result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+                    retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+                    continue;
+                }
                 retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
-                        topicName, Boolean.FALSE, "Old API Set", null, null, sBuffer, result));
+                        topicName, Boolean.FALSE, "Old API add blacklist, disable consume",
+                        null, null, sBuffer, result));
             }
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -420,15 +432,26 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
         }
         BaseEntity opEntity = (BaseEntity) result.getRetData();
         // check and get groupNameJsonSet info
-        if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.FALSE, sBuffer, result)) {
+        if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.FALSE,
+                "Old API batch set BlackList", sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
         Map<String, GroupConsumeCtrlEntity> addRecordMap =
                 (Map<String, GroupConsumeCtrlEntity>) result.getRetData();
         // add or update and build result
+        GroupConsumeCtrlEntity ctrlEntity;
         List<GroupProcessResult> retInfoList = new ArrayList<>();
         for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+            ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                    entry.getGroupName(), entry.getTopicName());
+            if (ctrlEntity != null
+                    && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_DISABLE) {
+                result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+                retInfoList.add(new GroupProcessResult(entry.getGroupName(),
+                        entry.getTopicName(), result));
+                continue;
+            }
             retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(entry, sBuffer, result));
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -451,27 +474,58 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
             return sBuffer;
         }
         BaseEntity opEntity = (BaseEntity) result.getRetData();
-        // get group list
+        // check and get topicName field
         if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+                WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        Set<String> groupNameSet = (Set<String>) result.getRetData();
-        // check and get topicName field
+        Set<String> topicNameSet = (Set<String>) result.getRetData();
+        // get group list
         if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+                WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        Set<String> topicNameSet = (Set<String>) result.getRetData();
+        Set<String> groupNameSet = (Set<String>) result.getRetData();
         // add allowed consume records
         List<GroupProcessResult> retInfoList = new ArrayList<>();
-        for (String groupName : groupNameSet) {
-            for (String topicName : topicNameSet) {
-                retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
-                        topicName, Boolean.TRUE, "enable consume", null, null, sBuffer, result));
-
+        if (groupNameSet.isEmpty()) {
+            Map<String, List<GroupConsumeCtrlEntity>> topicConsumeCtrlMap =
+                    defMetaDataService.getConsumeCtrlByGroupName(topicNameSet);
+            for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+                    topicConsumeCtrlMap.entrySet()) {
+                if (!entry.getValue().isEmpty()) {
+                    for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+                        if (ctrlEntity != null
+                                && ctrlEntity.getConsumeEnable() != EnableStatus.STATUS_ENABLE) {
+                            defMetaDataService.insertConsumeCtrlInfo(opEntity,
+                                    ctrlEntity.getGroupName(), ctrlEntity.getTopicName(),
+                                    Boolean.TRUE, "Old API delete blacklist, enable consume",
+                                    null, null, sBuffer, result);
+                        }
+                    }
+                }
+                result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+                retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+            }
+        } else {
+            GroupConsumeCtrlEntity ctrlEntity;
+            for (String groupName : groupNameSet) {
+                for (String topicName : topicNameSet) {
+                    ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                            groupName, topicName);
+                    if (ctrlEntity != null
+                            && ctrlEntity.getConsumeEnable() != EnableStatus.STATUS_ENABLE) {
+                        retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
+                                groupName, topicName, Boolean.TRUE,
+                                "Old API delete blacklist, enable consume",
+                                null, null, sBuffer, result));
+                    } else {
+                        result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+                        retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+                    }
+                }
             }
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -510,10 +564,21 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
         Set<String> topicNameSet = (Set<String>) result.getRetData();
         List<GroupProcessResult> retInfoList = new ArrayList<>();
         // add allowed consume records
+        GroupConsumeCtrlEntity ctrlEntity;
         for (String groupName : groupNameSet) {
             for (String topicName : topicNameSet) {
+                ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                        groupName, topicName);
+                if (ctrlEntity != null) {
+                    if (ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+                        result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+                        retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+                        continue;
+                    }
+                }
                 retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
-                        topicName, Boolean.TRUE, "enable consume", null, null, sBuffer, result));
+                        topicName, Boolean.TRUE, "Old API add, enable consume",
+                        null, null, sBuffer, result));
             }
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -537,15 +602,27 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
         }
         BaseEntity opEntity = (BaseEntity) result.getRetData();
         // check and get groupNameJsonSet info
-        if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.TRUE, sBuffer, result)) {
+        if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.TRUE,
+                "Old API batch set Enable Consume", sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
         Map<String, GroupConsumeCtrlEntity> addRecordMap =
                 (Map<String, GroupConsumeCtrlEntity>) result.getRetData();
         // add or update and build result
+        GroupConsumeCtrlEntity ctrlEntity;
         List<GroupProcessResult> retInfoList = new ArrayList<>();
         for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+            ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                    entry.getGroupName(), entry.getTopicName());
+            if (ctrlEntity != null) {
+                if (ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+                    result.setFailResult(DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+                    retInfoList.add(new GroupProcessResult(entry.getGroupName(),
+                            entry.getTopicName(), result));
+                    continue;
+                }
+            }
             retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(entry, sBuffer, result));
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -568,26 +645,57 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
             return sBuffer;
         }
         BaseEntity opEntity = (BaseEntity) result.getRetData();
-        // get group list
+        // check and get topicName field
         if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+                WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        Set<String> groupNameSet = (Set<String>) result.getRetData();
-        // check and get topicName field
+        Set<String> topicNameSet = (Set<String>) result.getRetData();
+        // get group list
         if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+                WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        Set<String> topicNameSet = (Set<String>) result.getRetData();
+        Set<String> groupNameSet = (Set<String>) result.getRetData();
         List<GroupProcessResult> retInfoList = new ArrayList<>();
-        for (String groupName : groupNameSet) {
-            for (String topicName : topicNameSet) {
-                defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
-                        groupName, topicName, sBuffer, result);
-                retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+        if (groupNameSet.isEmpty()) {
+            Map<String, List<GroupConsumeCtrlEntity>> topicConsumeCtrlMap =
+                    defMetaDataService.getConsumeCtrlByGroupName(topicNameSet);
+            for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+                    topicConsumeCtrlMap.entrySet()) {
+                if (!entry.getValue().isEmpty()) {
+                    for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+                        if (ctrlEntity != null
+                                && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+                            defMetaDataService.insertConsumeCtrlInfo(opEntity,
+                                    ctrlEntity.getGroupName(), ctrlEntity.getTopicName(),
+                                    Boolean.FALSE, "Old API delete, disable consume",
+                                    null, null, sBuffer, result);
+                        }
+                    }
+                }
+                result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+                retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+            }
+        } else {
+            GroupConsumeCtrlEntity ctrlEntity;
+            for (String groupName : groupNameSet) {
+                for (String topicName : topicNameSet) {
+                    ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                            groupName, topicName);
+                    if (ctrlEntity != null
+                            && ctrlEntity.getConsumeEnable() == EnableStatus.STATUS_ENABLE) {
+                        retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
+                                groupName, topicName, Boolean.FALSE,
+                                "Old API delete, disable consume",
+                                null, null, sBuffer, result));
+                    } else {
+                        result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+                        retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+                    }
+                }
             }
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -666,26 +774,60 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
             return sBuffer;
         }
         BaseEntity opEntity = (BaseEntity) result.getRetData();
-        // get group list
+        // check and get topicName field
         if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+                WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        Set<String> groupNameSet = (Set<String>) result.getRetData();
-        // check and get topicName field
+        Set<String> topicNameSet = (Set<String>) result.getRetData();
+        // get group list
         if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+                WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        Set<String> topicNameSet = (Set<String>) result.getRetData();
+        Set<String> groupNameSet = (Set<String>) result.getRetData();
         List<GroupProcessResult> retInfoList = new ArrayList<>();
-        for (String groupName : groupNameSet) {
-            for (String topicName : topicNameSet) {
-                retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
-                        groupName, topicName, Boolean.TRUE, "enable consume",
-                        false, TServerConstants.BLANK_FILTER_ITEM_STR, sBuffer, result));
+        if (groupNameSet.isEmpty()) {
+            Map<String, List<GroupConsumeCtrlEntity>> topicConsumeCtrlMap =
+                    defMetaDataService.getConsumeCtrlByGroupName(topicNameSet);
+            for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+                    topicConsumeCtrlMap.entrySet()) {
+                if (entry.getValue().isEmpty()) {
+                    result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+                    retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+                    continue;
+                }
+                for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+                    if (ctrlEntity != null
+                            && ctrlEntity.getFilterEnable() == EnableStatus.STATUS_ENABLE) {
+                        defMetaDataService.insertConsumeCtrlInfo(opEntity,
+                                ctrlEntity.getGroupName(), ctrlEntity.getTopicName(), null,
+                                "Old API delete, disable filter", false,
+                                TServerConstants.BLANK_FILTER_ITEM_STR, sBuffer, result);
+                    }
+                }
+                result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+                retInfoList.add(new GroupProcessResult("", entry.getKey(), result));
+            }
+        } else {
+            GroupConsumeCtrlEntity ctrlEntity;
+            for (String groupName : groupNameSet) {
+                for (String topicName : topicNameSet) {
+                    ctrlEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                            groupName, topicName);
+                    if (ctrlEntity != null
+                            && ctrlEntity.getFilterEnable() == EnableStatus.STATUS_ENABLE) {
+                        retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
+                                groupName, topicName, null,
+                                "Old API delete, disable filter", false,
+                                TServerConstants.BLANK_FILTER_ITEM_STR, sBuffer, result));
+                    } else {
+                        result.setFullInfo(true, DataOpErrCode.DERR_SUCCESS.getCode(), "Ok");
+                        retInfoList.add(new GroupProcessResult(groupName, topicName, result));
+                    }
+                }
             }
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -971,13 +1113,22 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
             return sBuffer;
         }
         String filterCondStr = (String) result.getRetData();
+        // add or modify filter consume records
+        GroupConsumeCtrlEntity ctrlEntity;
         List<GroupProcessResult> retInfoList = new ArrayList<>();
-        // modify filter consume records
         for (String groupName : groupNameSet) {
             for (String topicName : topicNameSet) {
-                retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity,
-                        groupName, topicName, Boolean.TRUE, "enable consume",
-                        filterEnable, filterCondStr, sBuffer, result));
+                ctrlEntity =
+                        defMetaDataService.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+                if (ctrlEntity == null) {
+                    retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
+                            topicName, Boolean.TRUE, "Old API set filter conditions",
+                            filterEnable, filterCondStr, sBuffer, result));
+                } else {
+                    retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(opEntity, groupName,
+                            topicName, null, "Old API set filter conditions",
+                            filterEnable, filterCondStr, sBuffer, result));
+                }
             }
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -1010,8 +1161,14 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
         Map<String, GroupConsumeCtrlEntity> addRecordMap =
                 (Map<String, GroupConsumeCtrlEntity>) result.getRetData();
         // add or update and build result
+        GroupConsumeCtrlEntity curEntity;
         List<GroupProcessResult> retInfoList = new ArrayList<>();
         for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+            curEntity = defMetaDataService.getConsumeCtrlByGroupAndTopic(
+                    entry.getGroupName(), entry.getTopicName());
+            if (curEntity == null) {
+                entry.setConsumeEnable(true);
+            }
             retInfoList.add(defMetaDataService.insertConsumeCtrlInfo(entry, sBuffer, result));
         }
         return buildRetInfo(retInfoList, sBuffer);
@@ -1071,8 +1228,8 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
             itemEntity =
                     new GroupConsumeCtrlEntity(itemOpEntity, groupName, topicName);
             itemEntity.updModifyInfo(itemOpEntity.getDataVerId(),
-                    true, "enable consume", filterEnable, filterCondStr);
-            addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+                    null, null, filterEnable, filterCondStr);
+            addRecordMap.put(itemEntity.getRecordKey(), itemEntity);
         }
         // check result
         if (addRecordMap.isEmpty()) {
@@ -1148,8 +1305,8 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
     }
 
     private boolean getGroupCsmJsonSetInfo(HttpServletRequest req, BaseEntity defOpEntity,
-                                           Boolean enableCsm, StringBuilder sBuffer,
-                                           ProcessResult result) {
+                                           Boolean enableCsm, String opReason,
+                                           StringBuilder sBuffer, ProcessResult result) {
         if (!WebParameterUtils.getJsonArrayParamValue(req,
                 WebFieldDef.GROUPJSONSET, true, null, result)) {
             return result.isSuccess();
@@ -1189,8 +1346,8 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
             itemEntity =
                     new GroupConsumeCtrlEntity(itemOpEntity, groupName, topicName);
             itemEntity.updModifyInfo(itemOpEntity.getDataVerId(),
-                    enableCsm, "Old API batch set", null, null);
-            addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+                    enableCsm, opReason, null, null);
+            addRecordMap.put(itemEntity.getRecordKey(), itemEntity);
         }
         // check result
         if (addRecordMap.isEmpty()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index cffc25460..5432697c4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
 import org.apache.inlong.tubemq.server.master.TMaster;
+import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
 
@@ -92,7 +93,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        Boolean consumeEnable = (Boolean) result.getRetData();
+        final Boolean consumeEnable = (Boolean) result.getRetData();
         // get filterEnable info
         if (!WebParameterUtils.getBooleanParamValue(req,
                 WebFieldDef.FILTERENABLE, false, null, sBuffer, result)) {
@@ -222,14 +223,36 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
         Set<String> topicNameSet = (Set<String>) result.getRetData();
         // execute delete operation
         List<GroupProcessResult> retInfo = new ArrayList<>();
-        for (String groupName : groupNameSet) {
-            for (String topicName : topicNameSet) {
-                defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
-                        groupName, topicName, sBuffer, result);
-                retInfo.add(new GroupProcessResult(groupName, topicName, result));
+        if (topicNameSet.isEmpty()) {
+            Map<String, List<GroupConsumeCtrlEntity>> groupCtrlConsumeMap =
+                    defMetaDataService.getConsumeCtrlByGroupName(groupNameSet);
+            for (Map.Entry<String, List<GroupConsumeCtrlEntity>> entry :
+                    groupCtrlConsumeMap.entrySet()) {
+                if (entry.getValue().isEmpty()) {
+                    result.setFullInfo(true,
+                            DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+                    retInfo.add(new GroupProcessResult(entry.getKey(), "", result));
+                    continue;
+                }
+                for (GroupConsumeCtrlEntity ctrlEntity : entry.getValue()) {
+                    if (ctrlEntity != null) {
+                        defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
+                                ctrlEntity.getGroupName(), ctrlEntity.getTopicName(), sBuffer, result);
+                    }
+                }
+                result.setFullInfo(true,
+                        DataOpErrCode.DERR_SUCCESS.getCode(), "Ok!");
+                retInfo.add(new GroupProcessResult(entry.getKey(), "", result));
+            }
+        } else {
+            for (String groupName : groupNameSet) {
+                for (String topicName : topicNameSet) {
+                    defMetaDataService.delConsumeCtrlConf(opEntity.getModifyUser(),
+                            groupName, topicName, sBuffer, result);
+                    retInfo.add(new GroupProcessResult(groupName, topicName, result));
+                }
             }
         }
-
         buildRetInfo(retInfo, sBuffer);
         return sBuffer;
     }
@@ -430,5 +453,4 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
         result.setSuccResult(addRecordMap);
         return result.isSuccess();
     }
-
 }