You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/19 08:14:17 UTC

[incubator-inlong] branch INLONG-570 updated: [INLONG-1444] Fix Web API multiple field search logic bug (#1445)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch INLONG-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-570 by this push:
     new 9e66f54  [INLONG-1444] Fix Web API multiple field search logic bug (#1445)
9e66f54 is described below

commit 9e66f5458cd50f1544677d36873618de7d9a75b2
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Aug 19 16:13:38 2021 +0800

    [INLONG-1444] Fix Web API multiple field search logic bug (#1445)
---
 .../server/master/metamanage/MetaDataManager.java  |  30 ++--
 .../metastore/BdbMetaStoreServiceImpl.java         |  20 ++-
 .../metamanage/metastore/MetaStoreService.java     |   8 +-
 .../metastore/dao/entity/TopicDeployEntity.java    |   2 +
 .../dao/mapper/GroupConsumeCtrlMapper.java         |   7 +-
 .../metastore/dao/mapper/TopicDeployMapper.java    |   4 +-
 .../impl/bdbimpl/BdbBrokerConfigMapperImpl.java    |  44 ++++--
 .../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java     | 116 ++++++++-------
 .../impl/bdbimpl/BdbTopicDeployMapperImpl.java     | 159 ++++++++++-----------
 .../master/web/handler/WebTopicDeployHandler.java  |   2 +-
 10 files changed, 202 insertions(+), 190 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 61e7927..98878d5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -19,6 +19,7 @@ package org.apache.inlong.tubemq.server.master.metamanage;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1184,7 +1185,9 @@ public class MetaDataManager implements Server {
     /**
      * Get broker topic entity, if query entity is null, return all topic entity
      *
-     * @param qryEntity query conditions
+     * @param topicNameSet  query by topicNameSet
+     * @param brokerIdSet   query by brokerIdSet
+     * @param qryEntity     query conditions
      * @return topic entity map
      */
     public Map<String, List<TopicDeployEntity>> getTopicConfEntityMap(Set<String> topicNameSet,
@@ -1202,11 +1205,14 @@ public class MetaDataManager implements Server {
      *
      * @return topic entity map
      */
-    public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<Integer> brokerIdSet,
-                                                                       Set<String> topicNameSet) {
+    public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                                       Set<Integer> brokerIdSet) {
         Map<Integer, BrokerConfEntity> qryBrokerInfoMap =
                 metaStoreService.getBrokerConfInfo(brokerIdSet, null, null);
-        return metaStoreService.getTopicDeployInfoMap(qryBrokerInfoMap.keySet(), topicNameSet);
+        if (qryBrokerInfoMap.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        return metaStoreService.getTopicDeployInfoMap(topicNameSet, qryBrokerInfoMap.keySet());
     }
 
     public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
@@ -1914,18 +1920,12 @@ public class MetaDataManager implements Server {
                                                             StringBuilder sBuffer,
                                                             ProcessResult result) {
         List<GroupProcessResult> retInfo = new ArrayList<>();
-        if ((groupNameSet == null || groupNameSet.isEmpty())
-                && (topicNameSet == null || topicNameSet.isEmpty())) {
+        Set<String> rmvRecordSet =
+                metaStoreService.getMatchedKeysByGroupAndTopicSet(groupNameSet, topicNameSet);
+        if (rmvRecordSet == null || rmvRecordSet.isEmpty()) {
             return retInfo;
         }
-        Set<String> rmvRecords = new HashSet<>();
-        if (groupNameSet != null && !groupNameSet.isEmpty()) {
-            rmvRecords.addAll(metaStoreService.getConsumeCtrlKeyByGroupName(groupNameSet));
-        }
-        if (topicNameSet != null && !topicNameSet.isEmpty()) {
-            rmvRecords.addAll(metaStoreService.getConsumeCtrlKeyByTopicName(topicNameSet));
-        }
-        for (String recKey : rmvRecords) {
+        for (String recKey : rmvRecordSet) {
             Tuple2<String, String> groupTopicTuple =
                     KeyBuilderUtils.splitRecKey2GroupTopic(recKey);
             metaStoreService.delGroupConsumeCtrlConf(operator, recKey, sBuffer, result);
@@ -1978,7 +1978,7 @@ public class MetaDataManager implements Server {
             return disTopicSet;
         }
         for (GroupConsumeCtrlEntity ctrlEntity : qryResult) {
-            if (!ctrlEntity.isEnableConsume()) {
+            if (ctrlEntity != null && !ctrlEntity.isEnableConsume()) {
                 disTopicSet.add(ctrlEntity.getTopicName());
             }
         }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 1c75ddc..756550e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -565,8 +565,8 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
 
     @Override
     public Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
-            Set<Integer> brokerIdSet,  Set<String> topicNameSet) {
-        return topicDeployMapper.getTopicDeployInfoMap(brokerIdSet, topicNameSet);
+            Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+        return topicDeployMapper.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
     }
 
 
@@ -901,16 +901,6 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
     }
 
     @Override
-    public Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet) {
-        return groupConsumeCtrlMapper.getConsumeCtrlKeyByTopicName(topicSet);
-    }
-
-    @Override
-    public Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet) {
-        return groupConsumeCtrlMapper.getConsumeCtrlKeyByGroupName(groupSet);
-    }
-
-    @Override
     public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
             String groupName, String topicName) {
         return groupConsumeCtrlMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
@@ -928,6 +918,12 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
     }
 
     @Override
+    public Set<String> getMatchedKeysByGroupAndTopicSet(Set<String> groupSet,
+                                                        Set<String> topicSet) {
+        return groupConsumeCtrlMapper.getMatchedRecords(groupSet, topicSet);
+    }
+
+    @Override
     public void registerObserver(AliveObserver eventObserver) {
         if (eventObserver != null) {
             eventObservers.add(eventObserver);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 2e12188..052d014 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -168,7 +168,7 @@ public interface MetaStoreService extends KeepAlive, Server {
             Set<String> topicNameSet, Set<Integer> brokerIdSet, TopicDeployEntity qryEntity);
 
     Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
-            Set<Integer> brokerIdSet, Set<String> topicNameSet);
+            Set<String> topicNameSet, Set<Integer> brokerIdSet);
 
     Map<String/* topicName */, List<TopicDeployEntity>> getTopicDepInfoByTopicBrokerId(
             Set<String> topicSet, Set<Integer> brokerIdSet);
@@ -334,9 +334,7 @@ public interface MetaStoreService extends KeepAlive, Server {
 
     List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
 
-    Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet);
-
-    Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet);
-
     List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity qryEntity);
+
+    Set<String> getMatchedKeysByGroupAndTopicSet(Set<String> groupSet, Set<String> topicSet);
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 4a65743..2d1df26 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -270,6 +270,8 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
         }
         return (target.getBrokerId() == TBaseConstants.META_VALUE_UNDEFINED
                 || target.getBrokerId() == this.brokerId)
+                && (target.getBrokerPort() == TBaseConstants.META_VALUE_UNDEFINED
+                || target.getBrokerPort() == this.brokerPort)
                 && (target.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED
                 || target.getTopicId() == this.topicNameId)
                 && (TStringUtils.isBlank(target.getTopicName())
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
index fd04fd9..52d54a9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
@@ -20,7 +20,6 @@ package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
 
@@ -48,10 +47,6 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
 
     List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
 
-    Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet);
-
-    Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet);
-
     GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
 
     Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
@@ -59,4 +54,6 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
 
     List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity qryEntity);
 
+    Set<String> getMatchedRecords(Set<String> groupSet, Set<String> topicSet);
+
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 3b69c5d..7896f57 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -59,8 +59,8 @@ public interface TopicDeployMapper extends AbstractMapper {
                                                          Set<Integer> brokerIdSet,
                                                          TopicDeployEntity qryEntity);
 
-    Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<Integer> brokerIdSet,
-                                                                Set<String> topicNameSet);
+    Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                                Set<Integer> brokerIdSet);
 
     Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
             Set<String> topicSet, Set<Integer> brokerIdSet);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index f89c07d..4bc5c5b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -23,7 +23,6 @@ import com.sleepycat.persist.EntityCursor;
 import com.sleepycat.persist.EntityStore;
 import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
-
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -202,30 +201,51 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
 
     /**
      * get broker configure info from bdb store
+     *
+     * @param brokerIdSet  need matched broker id set
+     * @param brokerIpSet  need matched broker ip set
+     * @param qryEntity    need matched properties
      * @return result, only read
      */
     @Override
     public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
                                                             Set<String> brokerIpSet,
                                                             BrokerConfEntity qryEntity) {
-        Set<Integer> qryBrokerKeySet = null;
+        Set<Integer> ipHitSet = null;
+        Set<Integer> totalMatchedSet = null;
         Map<Integer, BrokerConfEntity> retMap = new HashMap<>();
-        if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
-            qryBrokerKeySet = new HashSet<>(brokerIdSet);
-        }
+        // get records set by brokerIpSet
         if (brokerIpSet != null && !brokerIpSet.isEmpty()) {
-            if (qryBrokerKeySet == null) {
-                qryBrokerKeySet = new HashSet<>();
-            }
+            ipHitSet = new HashSet<>();
             for (String brokerIp : brokerIpSet) {
                 Integer brokerId = brokerIpIndexCache.get(brokerIp);
                 if (brokerId != null) {
-                    qryBrokerKeySet.add(brokerId);
+                    ipHitSet.add(brokerId);
+                }
+            }
+            if (ipHitSet.isEmpty()) {
+                return retMap;
+            }
+        }
+        // get intersection from brokerIdSet and brokerIpSet
+        if (brokerIdSet != null || ipHitSet != null) {
+            if (brokerIdSet == null) {
+                totalMatchedSet = new HashSet<>(ipHitSet);
+            } else {
+                if (ipHitSet == null) {
+                    totalMatchedSet = new HashSet<>(brokerIdSet);
+                } else {
+                    totalMatchedSet = new HashSet<>();
+                    for (Integer record : brokerIdSet) {
+                        if (ipHitSet.contains(record)) {
+                            totalMatchedSet.add(record);
+                        }
+                    }
                 }
             }
         }
         // get broker configures
-        if (qryBrokerKeySet == null) {
+        if (totalMatchedSet == null) {
             for (BrokerConfEntity entity :  brokerConfCache.values()) {
                 if (entity == null
                         || (qryEntity != null && !entity.isMatched(qryEntity))) {
@@ -234,7 +254,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
                 retMap.put(entity.getBrokerId(), entity);
             }
         } else {
-            for (Integer brokerId : qryBrokerKeySet) {
+            for (Integer brokerId : totalMatchedSet) {
                 BrokerConfEntity entity = brokerConfCache.get(brokerId);
                 if (entity == null
                         || (qryEntity != null && !entity.isMatched(qryEntity))) {
@@ -246,8 +266,6 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
         return retMap;
     }
 
-
-
     /**
      * get broker configure info from bdb store
      * @return result, only read
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 2791809..87c6fbb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.inlong.tubemq.corebase.utils.KeyBuilderUtils;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.common.exception.LoadMetaException;
 import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
 import org.apache.inlong.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -235,6 +236,9 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
         GroupConsumeCtrlEntity entity;
         List<GroupConsumeCtrlEntity> result = new ArrayList<>();
         for (String recordKey : keySet) {
+            if (recordKey == null) {
+                continue;
+            }
             entity = grpConsumeCtrlCache.get(recordKey);
             if (entity != null) {
                 result.add(entity);
@@ -262,34 +266,6 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
     }
 
     @Override
-    public Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicNameSet) {
-        ConcurrentHashSet<String> qrySet;
-        Set<String> retResult = new HashSet<>();
-        for (String topicName : topicNameSet) {
-            qrySet = grpConsumeCtrlTopicCache.get(topicName);
-            if (qrySet == null || qrySet.isEmpty()) {
-                continue;
-            }
-            retResult.addAll(qrySet);
-        }
-        return retResult;
-    }
-
-    @Override
-    public Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupNameSet) {
-        ConcurrentHashSet<String> qrySet;
-        Set<String> retResult = new HashSet<>();
-        for (String groupName : groupNameSet) {
-            qrySet = grpConsumeCtrlGroupCache.get(groupName);
-            if (qrySet == null || qrySet.isEmpty()) {
-                continue;
-            }
-            retResult.addAll(qrySet);
-        }
-        return retResult;
-    }
-
-    @Override
     public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
             String groupName, String topicName) {
         String recKey = KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName);
@@ -299,35 +275,13 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
     @Override
     public Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
             Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
-        ConcurrentHashSet<String> recSet;
-        Set<String> qryHitRecSet = null;
         Map<String, List<GroupConsumeCtrlEntity>> retEntityMap = new HashMap<>();
-        // filter group items
-        if (groupSet != null && !groupSet.isEmpty()) {
-            qryHitRecSet = new HashSet<>();
-            for (String group : groupSet) {
-                recSet = grpConsumeCtrlGroupCache.get(group);
-                if (recSet != null && !recSet.isEmpty()) {
-                    qryHitRecSet.addAll(recSet);
-                }
-            }
-        }
-        // filter topic items
-        if (topicSet != null && !topicSet.isEmpty()) {
-            if (qryHitRecSet == null) {
-                qryHitRecSet = new HashSet<>();
-            }
-            for (String topic : topicSet) {
-                recSet = grpConsumeCtrlTopicCache.get(topic);
-                if (recSet != null && !recSet.isEmpty()) {
-                    qryHitRecSet.addAll(recSet);
-                }
-            }
-        }
+        // filter matched keys by groupSet and topicSet
+        Set<String> totalMatchedSet = getMatchedRecords(groupSet, topicSet);
         // get matched records
         GroupConsumeCtrlEntity tmpEntity;
         List<GroupConsumeCtrlEntity> itemLst;
-        if (qryHitRecSet == null) {
+        if (totalMatchedSet == null) {
             for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
                 if (entity == null || (qryEntry != null && !entity.isMatched(qryEntry))) {
                     continue;
@@ -337,7 +291,7 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
                 itemLst.add(entity);
             }
         } else {
-            for (String recKey : qryHitRecSet) {
+            for (String recKey : totalMatchedSet) {
                 tmpEntity = grpConsumeCtrlCache.get(recKey);
                 if (tmpEntity == null || (qryEntry != null && !tmpEntity.isMatched(qryEntry))) {
                     continue;
@@ -365,12 +319,64 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
         return retEntitys;
     }
 
+    @Override
+    public Set<String> getMatchedRecords(Set<String> groupSet, Set<String> topicSet) {
+        Set<String> groupKeySet = null;
+        Set<String> topicKeySet = null;
+        Set<String> totalMatchedSet = null;
+        ConcurrentHashSet<String> recSet;
+        // filter group items
+        if (groupSet != null && !groupSet.isEmpty()) {
+            groupKeySet = new HashSet<>();
+            for (String group : groupSet) {
+                recSet = grpConsumeCtrlGroupCache.get(group);
+                if (recSet != null && !recSet.isEmpty()) {
+                    groupKeySet.addAll(recSet);
+                }
+            }
+            if (groupKeySet.isEmpty()) {
+                return Collections.emptySet();
+            }
+        }
+        // filter topic items
+        if (topicSet != null && !topicSet.isEmpty()) {
+            topicKeySet = new HashSet<>();
+            for (String topic : topicSet) {
+                recSet = grpConsumeCtrlTopicCache.get(topic);
+                if (recSet != null && !recSet.isEmpty()) {
+                    topicKeySet.addAll(recSet);
+                }
+            }
+            if (topicKeySet.isEmpty()) {
+                return Collections.emptySet();
+            }
+        }
+        // get intersection from groupKeySet and topicKeySet
+        if (groupKeySet != null || topicKeySet != null) {
+            if (groupKeySet == null) {
+                totalMatchedSet = new HashSet<>(topicKeySet);
+            } else {
+                if (topicKeySet == null) {
+                    totalMatchedSet = new HashSet<>(groupKeySet);
+                } else {
+                    totalMatchedSet = new HashSet<>();
+                    for (String record : groupKeySet) {
+                        if (topicKeySet.contains(record)) {
+                            totalMatchedSet.add(record);
+                        }
+                    }
+                }
+            }
+        }
+        return totalMatchedSet;
+    }
+
     /**
      * Put Group consume configure info into bdb store
      *
      * @param memEntity need add record
      * @param result process result with old value
-     * @return
+     * @return true sucess, false failue
      */
     private boolean putGroupConsumeCtrlConfig2Bdb(
             GroupConsumeCtrlEntity memEntity, ProcessResult result) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index 32bb156..2070b97 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -24,6 +24,7 @@ import com.sleepycat.persist.EntityStore;
 import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -233,33 +234,11 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
                                                                 Set<Integer> brokerIdSet,
                                                                 TopicDeployEntity qryEntity) {
         List<TopicDeployEntity> items;
-        Set<String> qryTopicKeySet = null;
-        ConcurrentHashSet<String> keySet;
         Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
-        // get deploy records set by topicName
-        if (topicNameSet != null && !topicNameSet.isEmpty()) {
-            qryTopicKeySet = new HashSet<>();
-            for (String topicName : topicNameSet) {
-                keySet = topicNameCacheIndex.get(topicName);
-                if (keySet != null && !keySet.isEmpty()) {
-                    qryTopicKeySet.addAll(keySet);
-                }
-            }
-        }
-        // get deploy records set by brokerId
-        if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
-            if (qryTopicKeySet == null) {
-                qryTopicKeySet = new HashSet<>();
-            }
-            for (Integer brokerId : brokerIdSet) {
-                keySet = brokerIdCacheIndex.get(brokerId);
-                if (keySet != null && !keySet.isEmpty()) {
-                    qryTopicKeySet.addAll(keySet);
-                }
-            }
-        }
+        // get matched keys by topicNameSet and brokerIdSet
+        Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
         // filter record by qryEntity
-        if (qryTopicKeySet == null) {
+        if (matchedKeySet == null) {
             for (TopicDeployEntity entry :  topicConfCache.values()) {
                 if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
                     continue;
@@ -270,7 +249,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
             }
         } else {
             TopicDeployEntity entry;
-            for (String recKey : qryTopicKeySet) {
+            for (String recKey : matchedKeySet) {
                 entry = topicConfCache.get(recKey);
                 if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
                     continue;
@@ -285,43 +264,21 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
 
     @Override
     public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(
-            Set<Integer> brokerIdSet, Set<String> topicNameSet) {
+            Set<String> topicNameSet, Set<Integer> brokerIdSet) {
         List<TopicDeployEntity> items;
-        Set<String> qryTopicKey = null;
-        ConcurrentHashSet<String> keySet;
         Map<Integer, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
         if (brokerIdSet != null) {
             for (Integer brokerId : brokerIdSet) {
                 retEntityMap.put(brokerId, new ArrayList<>());
             }
         }
-        if (topicNameSet != null && !topicNameSet.isEmpty()) {
-            qryTopicKey = new HashSet<>();
-            for (String topicName : topicNameSet) {
-                keySet = topicNameCacheIndex.get(topicName);
-                if (keySet != null && !keySet.isEmpty()) {
-                    qryTopicKey.addAll(keySet);
-                }
-            }
+        // get matched keys by topicNameSet and brokerIdSet
+        Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
+        // get record by keys
+        if (matchedKeySet == null) {
+            matchedKeySet = new HashSet<>(topicConfCache.keySet());
         }
-        if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
-            if (qryTopicKey == null) {
-                qryTopicKey = new HashSet<>();
-            }
-            for (Integer brokerId : brokerIdSet) {
-                keySet = brokerIdCacheIndex.get(brokerId);
-                if (keySet != null && !keySet.isEmpty()) {
-                    qryTopicKey.addAll(keySet);
-                }
-            }
-        }
-        if (qryTopicKey == null) {
-            qryTopicKey = new HashSet<>(topicConfCache.keySet());
-        }
-        if (qryTopicKey.isEmpty()) {
-            return retEntityMap;
-        }
-        for (String recordKey: qryTopicKey) {
+        for (String recordKey: matchedKeySet) {
             TopicDeployEntity entity = topicConfCache.get(recordKey);
             if (entity == null) {
                 continue;
@@ -338,43 +295,29 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
             Set<String> topicSet, Set<Integer> brokerIdSet) {
         TopicDeployEntity tmpEntity;
         List<TopicDeployEntity> itemLst;
-        ConcurrentHashSet<String> recSet;
-        Set<String> hitKeys = new HashSet<>();
         Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
-        if (((topicSet == null) || (topicSet.isEmpty()))
-                && ((brokerIdSet == null) || (brokerIdSet.isEmpty()))) {
+        // get matched keys by topicNameSet and brokerIdSet
+        Set<String> matchedKeySet = getMatchedRecords(topicSet, brokerIdSet);
+        // get records by matched keys
+        if (matchedKeySet == null) {
             for (TopicDeployEntity entity : topicConfCache.values()) {
+                if (entity == null) {
+                    continue;
+                }
                 itemLst = retEntityMap.computeIfAbsent(
                         entity.getTopicName(), k -> new ArrayList<>());
                 itemLst.add(entity);
             }
-            return retEntityMap;
-        }
-        if ((topicSet == null) || (topicSet.isEmpty())) {
-            for (Integer brokerId : brokerIdSet) {
-                recSet = brokerIdCacheIndex.get(brokerId);
-                if (recSet == null || recSet.isEmpty()) {
-                    continue;
-                }
-                hitKeys.addAll(recSet);
-            }
         } else {
-            for (String topic : topicSet) {
-                recSet = topicNameCacheIndex.get(topic);
-                if (recSet == null || recSet.isEmpty()) {
+            for (String key : matchedKeySet) {
+                tmpEntity = topicConfCache.get(key);
+                if (tmpEntity == null) {
                     continue;
                 }
-                hitKeys.addAll(recSet);
-            }
-        }
-        for (String key : hitKeys) {
-            tmpEntity = topicConfCache.get(key);
-            if (tmpEntity == null) {
-                continue;
+                itemLst = retEntityMap.computeIfAbsent(
+                        tmpEntity.getTopicName(), k -> new ArrayList<>());
+                itemLst.add(tmpEntity);
             }
-            itemLst = retEntityMap.computeIfAbsent(
-                    tmpEntity.getTopicName(), k -> new ArrayList<>());
-            itemLst.add(tmpEntity);
         }
         return retEntityMap;
     }
@@ -537,6 +480,58 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
         }
     }
 
+    private Set<String> getMatchedRecords(Set<String> topicNameSet,
+                                          Set<Integer> brokerIdSet) {
+        ConcurrentHashSet<String> keySet;
+        Set<String> topicKeySet = null;
+        Set<String> brokerKeySet = null;
+        Set<String> matchedKeySet = null;
+        // get deploy records set by topicName
+        if (topicNameSet != null && !topicNameSet.isEmpty()) {
+            topicKeySet = new HashSet<>();
+            for (String topicName : topicNameSet) {
+                keySet = topicNameCacheIndex.get(topicName);
+                if (keySet != null && !keySet.isEmpty()) {
+                    topicKeySet.addAll(keySet);
+                }
+            }
+            if (topicKeySet.isEmpty()) {
+                return Collections.emptySet();
+            }
+        }
+        // get deploy records set by brokerId
+        if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+            brokerKeySet = new HashSet<>();
+            for (Integer brokerId : brokerIdSet) {
+                keySet = brokerIdCacheIndex.get(brokerId);
+                if (keySet != null && !keySet.isEmpty()) {
+                    brokerKeySet.addAll(keySet);
+                }
+            }
+            if (brokerKeySet.isEmpty()) {
+                return Collections.emptySet();
+            }
+        }
+        // get intersection from topicKeySet and brokerKeySet
+        if (topicKeySet != null || brokerKeySet != null) {
+            if (topicKeySet == null) {
+                matchedKeySet = new HashSet<>(brokerKeySet);
+            } else {
+                if (brokerKeySet == null) {
+                    matchedKeySet = new HashSet<>(topicKeySet);
+                } else {
+                    matchedKeySet = new HashSet<>();
+                    for (String record : topicKeySet) {
+                        if (brokerKeySet.contains(record)) {
+                            matchedKeySet.add(record);
+                        }
+                    }
+                }
+            }
+        }
+        return matchedKeySet;
+    }
+
     private void addOrUpdCacheRecord(TopicDeployEntity entity) {
         topicConfCache.put(entity.getRecordKey(), entity);
         // add topic index map
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index a9d8205..7f1ab75 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -264,7 +264,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
         }
         Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
         Map<Integer, List<TopicDeployEntity>> queryResult =
-                metaDataManager.getTopicDeployInfoMap(brokerIdSet, topicNameSet);
+                metaDataManager.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
         // build query result
         int dataCount = 0;
         int totalStoreNum = 0;