You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/03/08 09:03:04 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-571] Adjust WebOtherInfoHandler class implementation

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b61019  [TUBEMQ-571] Adjust WebOtherInfoHandler class implementation
4b61019 is described below

commit 4b6101993c4131a1a2ad2fdd32d70e76e7f7fcb9
Author: gosonzhang <go...@tencent.com>
AuthorDate: Sat Mar 6 20:44:10 2021 +0800

    [TUBEMQ-571] Adjust WebOtherInfoHandler class implementation
---
 .../org/apache/tubemq/corebase/utils/Tuple2.java   |  11 ++
 .../tubemq/server/common/fielddef/WebFieldDef.java |  72 ++++++++-
 .../common/{webbase => fielddef}/WebFieldType.java |   7 +-
 .../server/common/utils/WebParameterUtils.java     |   9 ++
 .../org/apache/tubemq/server/master/TMaster.java   |   2 +-
 .../nodemanage/nodebroker/TopicPSInfoManager.java  |  66 ++++++++
 .../nodemanage/nodeconsumer/ConsumerBandInfo.java  |  12 +-
 .../nodeconsumer/ConsumerInfoHolder.java           |  76 +++++++--
 .../server/master/web/action/screen/Master.java    |  14 +-
 .../master/web/handler/WebOtherInfoHandler.java    | 179 ++++++++++-----------
 .../nodebroker/TopicPSInfoManagerTest.java         |   2 +-
 11 files changed, 331 insertions(+), 119 deletions(-)

diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
index 048452f..c9f16e5 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
@@ -58,4 +58,15 @@ public class Tuple2<T0, T1> {
     public T1 getF1() {
         return f1;
     }
+
+    /**
+     * Set all field values
+     *
+     * @param value0 The value for field 0
+     * @param value1 The value for field 1
+     */
+    public void setF0AndF1(T0 value0, T1 value1) {
+        this.f0 = value0;
+        this.f1 = value1;
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 68a833b..ba6a10f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -21,7 +21,6 @@ import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.RegexDef;
 import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.webbase.WebFieldType;
 
 
 public enum WebFieldDef {
@@ -44,6 +43,7 @@ public enum WebFieldDef {
     MODIFYUSER(4, "modifyUser", "mur", WebFieldType.STRING,
             "Record modifier", TBaseConstants.META_MAX_USERNAME_LENGTH,
             RegexDef.TMP_STRING),
+
     MANUALOFFSET(5, "manualOffset", "offset", WebFieldType.LONG,
             "Reset offset value", RegexDef.TMP_NUMBER),
     MSGCOUNT(6, "msgCount", "cnt", WebFieldType.INT,
@@ -55,6 +55,7 @@ public enum WebFieldDef {
             "Require return disk offset details"),
     NEEDREFRESH(9, "needRefresh", "nrf", WebFieldType.BOOLEAN,
             "Require refresh data"),
+
     COMPSGROUPNAME(10, "groupName", "group", WebFieldType.COMPSTRING,
             "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
                    RegexDef.TMP_GROUP),
@@ -67,6 +68,7 @@ public enum WebFieldDef {
             "Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
     BROKERID(14, "brokerId", "brokerId", WebFieldType.INT,
             "Broker ID", RegexDef.TMP_NUMBER),
+
     COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT,
             "Broker ID", RegexDef.TMP_NUMBER),
     WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN,
@@ -79,10 +81,11 @@ public enum WebFieldDef {
     TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup",
             WebFieldType.COMPSTRING, "Offset clone target group name",
             TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP),
+
     MANUALSET(20, "manualSet", "manSet",
             WebFieldType.BOOLEAN, "Whether manual offset setting mode"),
     OFFSETJSON(21, "offsetJsonInfo", "offsetInfo",
-            WebFieldType.JSONTYPE, "The offset info that needs to be added or modified"),
+            WebFieldType.JSONDICT, "The offset info that needs to be added or modified"),
     ONLYMEM(22, "onlyMemory", "onlyMem", WebFieldType.BOOLEAN,
             "Only clear the offset data in the memory cache, default is false"),
     ADMINAUTHTOKEN(23, "confModAuthToken", "authToken", WebFieldType.STRING,
@@ -90,6 +93,7 @@ public enum WebFieldDef {
             TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH),
     MAXMSGSIZE(24, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
             "Max allowed message size, unit MB", RegexDef.TMP_NUMBER),
+
     CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING,
             "Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
     MODIFYDATE(26, "modifyDate", "mDate", WebFieldType.STRING,
@@ -97,12 +101,73 @@ public enum WebFieldDef {
     HOSTNAME(27, "hostName", "hostName", WebFieldType.STRING,
             "Host name information", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
     CLIENTID(28, "clientId", "clientId", WebFieldType.STRING,
-            "Client ID", TBaseConstants.META_MAX_CLIENT_ID_LENGTH);
+            "Client ID", TBaseConstants.META_MAX_CLIENT_ID_LENGTH),
+    @Deprecated
+    CONSUMEGROUP(29, "consumeGroup", "group", WebFieldType.STRING,
+            "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+            RegexDef.TMP_GROUP),
+
+    @Deprecated
+    COMPSCONSUMEGROUP(30, "consumeGroup", "group", WebFieldType.COMPSTRING,
+            "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+                   RegexDef.TMP_GROUP),
+    REGIONID(31, "regionId", "regionId", WebFieldType.INT,
+            "Region id", RegexDef.TMP_NUMBER),
+    COMPREGIONID(32, "regionId", "regionId", WebFieldType.COMPINT,
+            "Region id", RegexDef.TMP_NUMBER),
+    DATAVERSIONID(33, "dataVersionId", "dataVerId", WebFieldType.LONG,
+            "Data version id", RegexDef.TMP_NUMBER),
+    TOPICNAMEID(34, "topicNameId", "topicId", WebFieldType.LONG,
+            "Topic name id", RegexDef.TMP_NUMBER),
 
+    COMPTOPICNAMEID(35, "topicNameId", "topicId", WebFieldType.COMPLONG,
+            "Topic name id", RegexDef.TMP_NUMBER),
+    NUMTOPICSTORES(36, "numTopicStores", "numStore", WebFieldType.INT,
+            "Number of topic stores", RegexDef.TMP_NUMBER),
+    NUMPARTITIONS(37, "numPartitions", "numPart", WebFieldType.INT,
+            "Number of partitions", RegexDef.TMP_NUMBER),
+    UNFLUSHTHRESHOLD(38, "unflushThreshold", "unfDskMsgCnt", WebFieldType.INT,
+            "Maximum allowed disk unflushing message count", RegexDef.TMP_NUMBER),
+    UNFLUSHINTERVAL(39, "unflushInterval", "unfDskInt", WebFieldType.INT,
+            "Maximum allowed disk unflushing interval", RegexDef.TMP_NUMBER),
 
+    UNFLUSHDATAHOLD(40, "unflushDataHold", "unfDskDataSize", WebFieldType.INT,
+            "Maximum allowed disk unflushing data size", RegexDef.TMP_NUMBER),
+    MCACHESIZEINMB(41, "memCacheMsgSizeInMB", "cacheSizeInMB", WebFieldType.INT,
+            "Maximum allowed memory cache size in MB", RegexDef.TMP_NUMBER),
+    UNFMCACHECNTINK(42, "memCacheMsgCntInK", "unfMemMsgCnt", WebFieldType.INT,
+            "Maximum allowed memory cache unflushing message count", RegexDef.TMP_NUMBER),
+    UNFMCACHEINTERVAL(43, "memCacheFlushIntvl", "unfMemInt", WebFieldType.INT,
+            "Maximum allowed disk unflushing data size", RegexDef.TMP_NUMBER),
+    MAXMSGSIZEINMB(44, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
+            "Maximum allowed message length", RegexDef.TMP_NUMBER),
 
+    ACCEPTPUBLISH(45, "acceptPublish", "accPub", WebFieldType.BOOLEAN,
+            "Enable publishing"),
+    ACCEPTSUBSCRIBE(46, "acceptSubscribe", "accSub", WebFieldType.BOOLEAN,
+            "Enable subscription"),
+    DELETEPOLICY(47, "deletePolicy", "delPolicy",
+            WebFieldType.DELPOLICY, "File aging strategy"),
+    TOPICJSONSET(48, "topicJsonSet", "topicSet",
+            WebFieldType.JSONSET, "The topic info set that needs to be added or modified"),
+    BROKERIP(49, "brokerIp", "brokerIp", WebFieldType.STRING,
+            "Broker ip", TBaseConstants.META_MAX_BROKER_IP_LENGTH,
+            RegexDef.TMP_IPV4ADDRESS),
 
+    BROKERPORT(50, "brokerPort", "brokerPort", WebFieldType.INT,
+            "Broker port", RegexDef.TMP_NUMBER),
+    BROKERTLSPORT(51, "brokerTLSPort", "brokerTLSPort", WebFieldType.INT,
+            "Broker tls port", RegexDef.TMP_NUMBER),
+    BROKERJSONSET(52, "brokerJsonSet", "brokerSet",
+            WebFieldType.JSONSET, "The broker info set that needs to be added or modified"),
+    STATUSID(53, "statusId", "statusId", WebFieldType.INT,
+            "Status id", RegexDef.TMP_NUMBER),
+    QRYPRIORITYID(54, "qryPriorityId", "qryPriId", WebFieldType.INT,
+            "Query priority id", RegexDef.TMP_NUMBER),
 
+    FLOWCTRLSET(55, "flowCtrlInfo", "flowCtrlSet",
+            WebFieldType.JSONSET,
+            "The flow control info set that needs to be added or modified");
 
 
 
@@ -174,6 +239,7 @@ public enum WebFieldDef {
 
     public boolean isCompFieldType() {
         return (this.type == WebFieldType.COMPINT
+                || this.type == WebFieldType.COMPLONG
                 || this.type == WebFieldType.COMPSTRING);
     }
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldType.java
similarity index 86%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldType.java
index a3e037f..7109d9e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.server.common.webbase;
+package org.apache.tubemq.server.common.fielddef;
 
 
 
@@ -29,7 +29,10 @@ public enum WebFieldType {
     DATE(5, "Date"),
     COMPSTRING(6, "Compound string"),
     COMPINT(7, "Compound integer"),
-    JSONTYPE(8, "Json");
+    COMPLONG(8, "Compound long"),
+    JSONDICT(9, "Json dict"),
+    JSONSET(10, "Json set"),
+    DELPOLICY(11, "Delete policy");
 
 
     private int value;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 161966d..2747efd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -251,6 +251,15 @@ public class WebParameterUtils {
         return strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"").
                 append(appendInfo).append("\"}");
     }
+
+    public static StringBuilder buildSuccessWithDataRetBegin(StringBuilder strBuffer) {
+        return strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+    }
+    public static StringBuilder buildSuccessWithDataRetEnd(
+            StringBuilder strBuffer, int totalCnt) {
+        return strBuffer.append("],\"count\":").append(totalCnt).append("}");
+    }
+
     /**
      * Parse the parameter value from an object value to a long value
      *
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index e7327cb..de510a6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -176,7 +176,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.producerHolder = new ProducerInfoHolder();
         this.consumerHolder = new ConsumerInfoHolder();
         this.consumerEventManager = new ConsumerEventManager(consumerHolder);
-        this.topicPSInfoManager = new TopicPSInfoManager();
+        this.topicPSInfoManager = new TopicPSInfoManager(this);
         this.loadBalancer = new DefaultLoadBalancer();
         this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
                 false, TBaseConstants.META_VALUE_UNDEFINED);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 2b0ffc9..19371a1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -29,12 +29,15 @@ import org.apache.tubemq.corebase.cluster.BrokerInfo;
 import org.apache.tubemq.corebase.cluster.Partition;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 
 /**
  * Topic Publication/Subscription info management
  */
 public class TopicPSInfoManager {
 
+    private final TMaster master;
     private final ConcurrentHashMap<String/* topic */,
             ConcurrentHashMap<BrokerInfo, TopicInfo>> brokerPubInfoMap =
             new ConcurrentHashMap<>();
@@ -45,6 +48,10 @@ public class TopicPSInfoManager {
             ConcurrentHashSet<String/* group */>> topicSubInfoMap =
             new ConcurrentHashMap<>();
 
+    public TopicPSInfoManager(TMaster master) {
+        this.master = master;
+    }
+
     /**
      * Get groups according to topic
      *
@@ -283,4 +290,63 @@ public class TopicPSInfoManager {
         topicPubInfoMap.clear();
         topicSubInfoMap.clear();
     }
+
+    /**
+     * Get the set of online groups subscribed to the specified topic.
+     * If the specified query consumer group is empty, then the full amount of
+     * online consumer groups will be taken; if the specified subscription topic
+     * is empty, then all online consumer groups will be taken.
+     *
+     * @param qryGroupSet
+     * @param subTopicSet
+     * @return online groups
+     */
+    public Set<String> getGroupSetWithSubTopic(Set<String> qryGroupSet,
+                                               Set<String> subTopicSet) {
+        Set<String> resultSet = new HashSet<>();
+        if (subTopicSet.isEmpty()) {
+            // get all online group
+            ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+            List<String> onlineGroups = consumerHolder.getAllGroup();
+            if (!onlineGroups.isEmpty()) {
+                if (qryGroupSet.isEmpty()) {
+                    resultSet.addAll(onlineGroups);
+                } else {
+                    for (String group : qryGroupSet) {
+                        if (onlineGroups.contains(group)) {
+                            resultSet.add(group);
+                        }
+                    }
+                }
+            }
+        } else {
+            // filter subscribed online group
+            Set<String> tmpGroupSet;
+            if (qryGroupSet.isEmpty()) {
+                for (String topic : subTopicSet) {
+                    tmpGroupSet = topicSubInfoMap.get(topic);
+                    if (tmpGroupSet != null && !tmpGroupSet.isEmpty()) {
+                        resultSet.addAll(tmpGroupSet);
+                    }
+                }
+            } else {
+                for (String topic : subTopicSet) {
+                    tmpGroupSet = topicSubInfoMap.get(topic);
+                    if (tmpGroupSet == null || tmpGroupSet.isEmpty()) {
+                        continue;
+                    }
+                    for (String group : qryGroupSet) {
+                        if (tmpGroupSet.contains(group)) {
+                            resultSet.add(group);
+                        }
+                    }
+                    qryGroupSet.removeAll(resultSet);
+                    if (qryGroupSet.isEmpty()) {
+                        break;
+                    }
+                }
+            }
+        }
+        return resultSet;
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
index 97e9f89..b925ddf 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
@@ -446,11 +446,21 @@ public class ConsumerBandInfo {
     }
 
     public List<ConsumerInfo> getConsumerInfoList() {
-        List<ConsumerInfo> result = new ArrayList<ConsumerInfo>();
+        List<ConsumerInfo> result = new ArrayList<>();
         result.addAll(this.consumerInfoMap.values());
         return result;
     }
 
+    public List<ConsumerInfo> cloneConsumerInfoList() {
+        List<ConsumerInfo> result = new ArrayList<>();
+        for (ConsumerInfo consumer : this.consumerInfoMap.values()) {
+            if (consumer != null) {
+                result.add(consumer.clone());
+            }
+        }
+        return result;
+    }
+
     public ConsumerInfo getConsumerInfo(String consumerId) {
         return consumerInfoMap.get(consumerId);
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index fc5d932..d8a182e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -19,7 +19,6 @@ package org.apache.tubemq.server.master.nodemanage.nodeconsumer;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -217,20 +216,73 @@ public class ConsumerInfoHolder {
     }
 
     public Set<String> getGroupTopicSet(String group) {
-        if (group == null) {
-            return new HashSet<>();
+        if (group != null) {
+            try {
+                rwLock.readLock().lock();
+                ConsumerBandInfo oldConsumeBandInfo =
+                        groupInfoMap.get(group);
+                if (oldConsumeBandInfo != null) {
+                    return oldConsumeBandInfo.getTopicSet();
+                }
+            } finally {
+                rwLock.readLock().unlock();
+            }
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                return oldConsumeBandInfo.getTopicSet();
+        return Collections.emptySet();
+    }
+
+    /**
+     * Get the group's subscribed topics and ConsumerInfos.
+     *
+     * @param group, group to be queried
+     * @param result, query result, only read
+     * @return Has the data been found
+     */
+    public boolean getGroupTopicSetAndConsumerInfos(
+            String group, Tuple2<Set<String>, List<ConsumerInfo>> result) {
+        if (group != null) {
+            try {
+                rwLock.readLock().lock();
+                ConsumerBandInfo curGroupInfo =
+                        groupInfoMap.get(group);
+                if (curGroupInfo != null) {
+                    result.setF0AndF1(curGroupInfo.getTopicSet(),
+                            curGroupInfo.cloneConsumerInfoList());
+                    return true;
+                }
+            } finally {
+                rwLock.readLock().unlock();
             }
-        } finally {
-            rwLock.readLock().unlock();
         }
-        return new HashSet<>();
+        result.setF0AndF1(Collections.emptySet(), Collections.emptyList());
+        return false;
+    }
+
+    /**
+     * Get the group's subscribed topics and client count.
+     *
+     * @param group, group to be queried
+     * @param result, query result, only read
+     * @return Has the data been found
+     */
+    public boolean getGroupTopicSetAndClientCnt(String group,
+                                                Tuple2<Set<String>, Integer> result) {
+        if (group != null) {
+            try {
+                rwLock.readLock().lock();
+                ConsumerBandInfo curGroupInfo =
+                        groupInfoMap.get(group);
+                if (curGroupInfo != null) {
+                    result.setF0AndF1(curGroupInfo.getTopicSet(),
+                            curGroupInfo.getGroupCnt());
+                    return true;
+                }
+            } finally {
+                rwLock.readLock().unlock();
+            }
+        }
+        result.setF0AndF1(Collections.emptySet(), 0);
+        return false;
     }
 
     /**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
index 60dd31b..502a2e2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
@@ -32,6 +32,7 @@ import org.apache.tubemq.corebase.cluster.ProducerInfo;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.corerpc.exception.StandbyException;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
@@ -298,13 +299,16 @@ public class Master implements Action {
         TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
         Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
                 master.getCurrentSubInfoMap();
+        int currPartSize = 0;
         List<String> groupList = consumerHolder.getAllGroup();
+        Tuple2<Set<String>, List<ConsumerInfo>> queryInfo = new Tuple2<>();
         for (String group : groupList) {
-            Set<String> topicSet = consumerHolder.getGroupTopicSet(group);
-            for (String topic : topicSet) {
-                int currPartSize = 0;
-                List<ConsumerInfo> consumerList = consumerHolder.getConsumerList(group);
-                for (ConsumerInfo consumer : consumerList) {
+            if (!consumerHolder.getGroupTopicSetAndConsumerInfos(group, queryInfo)) {
+                continue;
+            }
+            for (String topic : queryInfo.getF0()) {
+                currPartSize = 0;
+                for (ConsumerInfo consumer : queryInfo.getF1()) {
                     Map<String, Map<String, Partition>> consumerSubInfoMap =
                             currentSubInfoMap.get(consumer.getConsumerId());
                     if (consumerSubInfoMap != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 2e7e993..214bfce 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -27,10 +27,11 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.cluster.ConsumerInfo;
 import org.apache.tubemq.corebase.cluster.Partition;
-import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.WebParameterUtils;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
@@ -67,75 +68,56 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
      * @return
      */
     public StringBuilder getSubscribeInfo(HttpServletRequest req) {
-        StringBuilder strBuffer = new StringBuilder();
-        try {
-            String strConsumeGroup = WebParameterUtils.validGroupParameter("consumeGroup",
-                    req.getParameter("consumeGroup"), TBaseConstants.META_MAX_GROUPNAME_LENGTH, false, "");
-            String strTopicName = WebParameterUtils.validStringParameter("topicName",
-                    req.getParameter("topicName"), TBaseConstants.META_MAX_TOPICNAME_LENGTH, false, "");
-            ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
-            TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
-            List<String> queryGroupSet = new ArrayList<>();
-            if (TStringUtils.isEmpty(strTopicName)) {
-                List<String> tmpGroupSet = consumerHolder.getAllGroup();
-                if (!tmpGroupSet.isEmpty()) {
-                    if (TStringUtils.isEmpty(strConsumeGroup)) {
-                        for (String tmpGroup : tmpGroupSet) {
-                            if (tmpGroup != null) {
-                                queryGroupSet.add(tmpGroup);
-                            }
-                        }
-                    } else {
-                        if (tmpGroupSet.contains(strConsumeGroup)) {
-                            queryGroupSet.add(strConsumeGroup);
-                        }
-                    }
-                }
-            } else {
-                Set<String> groupSet = topicPSInfoManager.getTopicSubInfo(strTopicName);
-                if ((groupSet != null) && (!groupSet.isEmpty())) {
-                    if (TStringUtils.isEmpty(strConsumeGroup)) {
-                        for (String tmpGroup : groupSet) {
-                            if (tmpGroup != null) {
-                                queryGroupSet.add(tmpGroup);
-                            }
-                        }
-                    } else {
-                        if (groupSet.contains(strConsumeGroup)) {
-                            queryGroupSet.add(strConsumeGroup);
-                        }
-                    }
-                }
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(1024);
+        // get group list
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> inGroupNameSet = (Set<String>) result.retData1;
+        if (inGroupNameSet.isEmpty()) {
+            if (!WebParameterUtils.getStringParamValue(req,
+                    WebFieldDef.COMPSCONSUMEGROUP, false, null, result)) {
+                WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+                return sBuilder;
             }
-            if (queryGroupSet.isEmpty()) {
-                strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":0,\"data\":[]}");
-            } else {
-                int totalCnt = 0;
-                strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":")
-                        .append(queryGroupSet.size()).append(",\"data\":[");
-                for (String tmpGroup : queryGroupSet) {
-                    Set<String> topicSet = consumerHolder.getGroupTopicSet(tmpGroup);
-                    final int consuemrCnt = consumerHolder.getConsumerCnt(tmpGroup);
-                    if (totalCnt++ > 0) {
-                        strBuffer.append(",");
-                    }
-                    strBuffer.append("{\"consumeGroup\":\"").append(tmpGroup).append("\",\"topicSet\":[");
-                    int topicCnt = 0;
-                    for (String tmpTopic : topicSet) {
-                        if (topicCnt++ > 0) {
-                            strBuffer.append(",");
-                        }
-                        strBuffer.append("\"").append(tmpTopic).append("\"");
-                    }
-                    strBuffer.append("],\"consumerNum\":").append(consuemrCnt).append("}");
+            inGroupNameSet = (Set<String>) result.retData1;
+        }
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+        Set<String> queryGroupSet =
+                topicPSInfoManager.getGroupSetWithSubTopic(inGroupNameSet, topicNameSet);
+        int totalCnt = 0;
+        int topicCnt = 0;
+        Tuple2<Set<String>, Integer> queryInfo = new Tuple2<>();
+        ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+        for (String group : queryGroupSet) {
+            if (!consumerHolder.getGroupTopicSetAndClientCnt(group, queryInfo)) {
+                continue;
+            }
+            if (totalCnt++ > 0) {
+                sBuilder.append(",");
+            }
+            sBuilder.append("{\"consumeGroup\":\"").append(group).append("\",\"topicSet\":[");
+            topicCnt = 0;
+            for (String tmpTopic : queryInfo.getF0()) {
+                if (topicCnt++ > 0) {
+                    sBuilder.append(",");
                 }
-                strBuffer.append("]}");
+                sBuilder.append("\"").append(tmpTopic).append("\"");
             }
-        } catch (Throwable e) {
-            strBuffer.append("{\"result\":false,\"errCode\":500,\"errMsg\":\"Exception on process")
-                    .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
+            sBuilder.append("],\"consumerNum\":").append(queryInfo.getF1()).append("}");
         }
-        return strBuffer;
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+        return sBuilder;
     }
 
     /**
@@ -146,10 +128,19 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
      */
     // #lizard forgives
     public StringBuilder getConsumeGroupDetailInfo(HttpServletRequest req) {
-        StringBuilder strBuffer = new StringBuilder(1024);
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(1024);
+        // get group name
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null, result)) {
+            if (!WebParameterUtils.getStringParamValue(req,
+                    WebFieldDef.CONSUMEGROUP, true, null, result)) {
+                WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+                return sBuilder;
+            }
+        }
+        String strConsumeGroup = (String) result.retData1;
         try {
-            String strConsumeGroup = WebParameterUtils.validGroupParameter("consumeGroup",
-                    req.getParameter("consumeGroup"), TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, "");
             boolean isBandConsume = false;
             boolean isNotAllocate = false;
             boolean isSelectBig = true;
@@ -194,78 +185,78 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                     rebalanceCheckTime = consumerBandInfo.getCurCheckCycle();
                 }
             }
-            strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"")
+            sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"")
                     .append(",\"count\":").append(consumerList.size()).append(",\"topicSet\":[");
             int itemCnt = 0;
             for (String topicItem : topicSet) {
                 if (itemCnt++ > 0) {
-                    strBuffer.append(",");
+                    sBuilder.append(",");
                 }
-                strBuffer.append("\"").append(topicItem).append("\"");
+                sBuilder.append("\"").append(topicItem).append("\"");
             }
-            strBuffer.append("],\"consumeGroup\":\"").append(strConsumeGroup).append("\",\"re-rebalance\":{");
+            sBuilder.append("],\"consumeGroup\":\"").append(strConsumeGroup).append("\",\"re-rebalance\":{");
             itemCnt = 0;
             for (Map.Entry<String, NodeRebInfo> entry : nodeRebInfoMap.entrySet()) {
                 if (itemCnt++ > 0) {
-                    strBuffer.append(",");
+                    sBuilder.append(",");
                 }
-                strBuffer.append("\"").append(entry.getKey()).append("\":");
-                strBuffer = entry.getValue().toJsonString(strBuffer);
+                sBuilder.append("\"").append(entry.getKey()).append("\":");
+                sBuilder = entry.getValue().toJsonString(sBuilder);
             }
-            strBuffer.append("},\"isBandConsume\":").append(isBandConsume);
+            sBuilder.append("},\"isBandConsume\":").append(isBandConsume);
             // Append band consume info
             if (isBandConsume) {
-                strBuffer.append(",\"isNotAllocate\":").append(isNotAllocate)
+                sBuilder.append(",\"isNotAllocate\":").append(isNotAllocate)
                         .append(",\"sessionKey\":\"").append(sessionKey)
                         .append("\",\"isSelectBig\":").append(isSelectBig)
                         .append(",\"reqSourceCount\":").append(reqSourceCount)
                         .append(",\"curSourceCount\":").append(curSourceCount)
                         .append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime);
             }
-            strBuffer.append(",\"rebInfo\":{");
+            sBuilder.append(",\"rebInfo\":{");
             if (rebalanceStatus == -2) {
-                strBuffer.append("\"isRebalanced\":false");
+                sBuilder.append("\"isRebalanced\":false");
             } else if (rebalanceStatus == 0) {
-                strBuffer.append("\"isRebalanced\":true,\"checkPasted\":false")
+                sBuilder.append("\"isRebalanced\":true,\"checkPasted\":false")
                         .append(",\"defBClientRate\":").append(defBClientRate)
                         .append(",\"confBClientRate\":").append(confBClientRate)
                         .append(",\"curBClientRate\":").append(curBClientRate)
                         .append(",\"minRequireClientCnt\":").append(minRequireClientCnt);
             } else {
-                strBuffer.append("\"isRebalanced\":true,\"checkPasted\":true")
+                sBuilder.append("\"isRebalanced\":true,\"checkPasted\":true")
                         .append(",\"defBClientRate\":").append(defBClientRate)
                         .append(",\"confBClientRate\":").append(confBClientRate)
                         .append(",\"curBClientRate\":").append(curBClientRate);
             }
-            strBuffer.append("},\"filterConds\":{");
+            sBuilder.append("},\"filterConds\":{");
             if (existedTopicConditions != null) {
                 int keyCount = 0;
                 for (Map.Entry<String, TreeSet<String>> entry : existedTopicConditions.entrySet()) {
                     if (keyCount++ > 0) {
-                        strBuffer.append(",");
+                        sBuilder.append(",");
                     }
-                    strBuffer.append("\"").append(entry.getKey()).append("\":[");
+                    sBuilder.append("\"").append(entry.getKey()).append("\":[");
                     if (entry.getValue() != null) {
                         int itemCount = 0;
                         for (String filterCond : entry.getValue()) {
                             if (itemCount++ > 0) {
-                                strBuffer.append(",");
+                                sBuilder.append(",");
                             }
-                            strBuffer.append("\"").append(filterCond).append("\"");
+                            sBuilder.append("\"").append(filterCond).append("\"");
                         }
                     }
-                    strBuffer.append("]");
+                    sBuilder.append("]");
                 }
             }
-            strBuffer.append("}");
+            sBuilder.append("}");
             // Append consumer info of the group
-            getConsumerInfoList(consumerList, isBandConsume, strBuffer);
-            strBuffer.append("}");
+            getConsumerInfoList(consumerList, isBandConsume, sBuilder);
+            sBuilder.append("}");
         } catch (Exception e) {
-            strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+            sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
                     .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
         }
-        return strBuffer;
+        return sBuilder;
     }
 
     /**
diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
index 76e9cae..ba988aa 100644
--- a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
+++ b/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
@@ -31,7 +31,7 @@ public class TopicPSInfoManagerTest {
 
     @Before
     public void setUp() throws Exception {
-        topicPSInfoManager = new TopicPSInfoManager();
+        topicPSInfoManager = new TopicPSInfoManager(null);
     }
 
     @After