You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/11/14 04:19:15 UTC

[incubator-inlong] branch master updated: [INLONG-1789]Adjust the logic of the consumer group on the Master side (#1790)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 514b53e  [INLONG-1789]Adjust the logic of the consumer group on the Master side (#1790)
514b53e is described below

commit 514b53eccb9d278289f6225f73c8a8ac5099d500
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Nov 14 12:19:09 2021 +0800

    [INLONG-1789]Adjust the logic of the consumer group on the Master side (#1790)
---
 .../inlong/tubemq/corebase/TErrCodeConstants.java  |  14 +
 .../tubemq/corebase/cluster/ConsumerInfo.java      | 164 -----
 .../server/common/paramcheck/PBParameterUtils.java | 178 +----
 .../tubemq/server/common/utils/ClientSyncInfo.java | 113 ++++
 .../inlong/tubemq/server/master/TMaster.java       | 143 ++--
 .../server/master/balance/DefaultLoadBalancer.java | 247 +++----
 .../tubemq/server/master/balance/LoadBalancer.java |   5 +-
 .../server/master/metamanage/MetaDataManager.java  |   2 +-
 .../nodemanage/nodebroker/TopicPSInfoManager.java  |  77 +--
 .../nodemanage/nodeconsumer/ConsumeGroupInfo.java  | 725 +++++++++++++++++++++
 .../nodemanage/nodeconsumer/ConsumeType.java       |  55 ++
 .../nodemanage/nodeconsumer/ConsumerBandInfo.java  | 495 --------------
 .../nodeconsumer/ConsumerEventManager.java         |  20 +-
 .../nodemanage/nodeconsumer/ConsumerInfo.java      | 244 +++++++
 .../nodeconsumer/ConsumerInfoHolder.java           | 595 ++++++++---------
 .../nodemanage/nodeconsumer/TopicConfigInfo.java   |  65 ++
 .../server/master/web/action/screen/Master.java    | 104 +--
 .../web/handler/WebAdminGroupCtrlHandler.java      |  10 +-
 .../master/web/handler/WebOtherInfoHandler.java    |  81 ++-
 .../nodebroker/TopicPSInfoManagerTest.java         |  20 +-
 20 files changed, 1808 insertions(+), 1549 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
index 3210122..30ff827 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
@@ -27,6 +27,7 @@ public class TErrCodeConstants {
 
     public static final int BAD_REQUEST = 400;
     public static final int UNAUTHORIZED = 401;
+    public static final int CONNECT_RETURN_NULL = 402;
     public static final int FORBIDDEN = 403;
     public static final int NOT_FOUND = 404;
     public static final int ALL_PARTITION_FROZEN = 405;
@@ -38,6 +39,18 @@ public class TErrCodeConstants {
     public static final int DUPLICATE_PARTITION = 412;
     public static final int CERTIFICATE_FAILURE = 415;
     public static final int SERVER_RECEIVE_OVERFLOW = 419;
+    public static final int CLIENT_SHUTDOWN = 420;
+    public static final int CLIENT_HIGH_FREQUENCY_REQUEST = 421;
+    public static final int PARTITION_UNSUBSCRIBABLE = 422;
+    public static final int PARTITION_UNPUBLISHABLE = 423;
+    public static final int CLIENT_INCONSISTENT_CONSUMETYPE = 424;
+    public static final int CLIENT_INCONSISTENT_TOPICSET = 425;
+    public static final int CLIENT_INCONSISTENT_FILTERSET = 426;
+    public static final int CLIENT_INCONSISTENT_SESSIONKEY = 427;
+    public static final int CLIENT_INCONSISTENT_SELECTBIG = 428;
+    public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429;
+    public static final int CLIENT_DUPLICATE_INDEXID = 430;
+
     public static final int CONSUME_GROUP_FORBIDDEN = 450;
     public static final int SERVER_CONSUME_SPEED_LIMIT = 452;
     public static final int CONSUME_CONTENT_FORBIDDEN = 455;
@@ -45,6 +58,7 @@ public class TErrCodeConstants {
     public static final int INTERNAL_SERVER_ERROR = 500;
     public static final int SERVICE_UNAVAILABLE = 503;
     public static final int INTERNAL_SERVER_ERROR_MSGSET_NULL = 510;
+    public static final int UNSPECIFIED_ABNORMAL = 599;
 
     public static final List<Integer> IGNORE_ERROR_SET =
             Arrays.asList(BAD_REQUEST, NOT_FOUND, ALL_PARTITION_FROZEN,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/ConsumerInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/ConsumerInfo.java
deleted file mode 100644
index 9a1e32c..0000000
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/ConsumerInfo.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.corebase.cluster;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-
-public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
-
-    private static final long serialVersionUID = 3095734962491009711L;
-
-    private final String consumerId;
-    private final String group;
-    private final Set<String> topicSet;
-    private final Map<String, TreeSet<String>> topicConditions;
-    private boolean requireBound = false;
-    private String sessionKey = "";
-    private long startTime = TBaseConstants.META_VALUE_UNDEFINED;
-    private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
-    private boolean overTLS = false;
-    private Map<String, Long> requiredPartition;
-
-    public ConsumerInfo(String consumerId,
-                        boolean overTLS,
-                        String group,
-                        Set<String> topicSet,
-                        Map<String, TreeSet<String>> topicConditions,
-                        boolean requireBound,
-                        String sessionKey,
-                        long startTime,
-                        int sourceCount,
-                        Map<String, Long> requiredPartition) {
-        this.consumerId = consumerId;
-        this.overTLS = overTLS;
-        this.group = group;
-        this.topicSet = topicSet;
-        if (topicConditions == null) {
-            this.topicConditions =
-                    new HashMap<>();
-        } else {
-            this.topicConditions = topicConditions;
-        }
-        this.requireBound = requireBound;
-        this.sessionKey = sessionKey;
-        this.startTime = startTime;
-        this.sourceCount = sourceCount;
-        this.requiredPartition = requiredPartition;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sBuilder = new StringBuilder(512);
-        sBuilder.append(consumerId);
-        sBuilder.append("@");
-        sBuilder.append(group);
-        sBuilder.append(":");
-        int count = 0;
-        for (String topicItem : topicSet) {
-            if (count++ > 0) {
-                sBuilder.append(",");
-            }
-            sBuilder.append(topicItem);
-        }
-        sBuilder.append("@overTLS=").append(overTLS);
-        return sBuilder.toString();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof ConsumerInfo)) {
-            return false;
-        }
-        final ConsumerInfo info = (ConsumerInfo) obj;
-        return (this.consumerId.equals(info.getConsumerId()));
-    }
-
-    @Override
-    public int compareTo(ConsumerInfo o) {
-        if (!this.consumerId.equals(o.consumerId)) {
-            return this.consumerId.compareTo(o.consumerId);
-        }
-        if (!this.group.equals(o.group)) {
-            return this.group.compareTo(o.group);
-        }
-        return 0;
-    }
-
-    public boolean isRequireBound() {
-        return this.requireBound;
-    }
-
-    public String getSessionKey() {
-        return this.sessionKey;
-    }
-
-    public void setSessionKey(String sessionKey) {
-        this.sessionKey = sessionKey;
-    }
-
-    public long getStartTime() {
-        return this.startTime;
-    }
-
-    public int getSourceCount() {
-        return this.sourceCount;
-    }
-
-    public Map<String, Long> getRequiredPartition() {
-        return this.requiredPartition;
-    }
-
-    public String getConsumerId() {
-        return consumerId;
-    }
-
-    public String getGroup() {
-        return group;
-    }
-
-    public Set<String> getTopicSet() {
-        return topicSet;
-    }
-
-    public Map<String, TreeSet<String>> getTopicConditions() {
-        return topicConditions;
-    }
-
-    public boolean isOverTLS() {
-        return overTLS;
-    }
-
-    @Override
-    public ConsumerInfo clone() {
-        return new ConsumerInfo(this.consumerId, this.overTLS, this.group, this.topicSet,
-                this.topicConditions, this.requireBound, this.sessionKey,
-                this.startTime, this.sourceCount, this.requiredPartition);
-    }
-
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
index 0ae10a5..83ba6d7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -22,11 +22,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
 import org.apache.inlong.tubemq.corebase.TokenConstants;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.broker.metadata.MetadataManager;
@@ -36,7 +34,8 @@ import org.apache.inlong.tubemq.server.master.MasterConfig;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeType;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,13 +121,13 @@ public class PBParameterUtils {
         return retResult;
     }
 
-    public static ParamCheckResult checkConsumerOffsetSetInfo(boolean isReqConsumeBand,
+    public static ParamCheckResult checkConsumerOffsetSetInfo(ConsumeType csmType,
                                                               final Set<String> reqTopicSet,
                                                               final String requiredParts,
                                                               final StringBuilder strBuffer) {
         Map<String, Long> requiredPartMap = new HashMap<>();
         ParamCheckResult retResult = new ParamCheckResult();
-        if (!isReqConsumeBand) {
+        if (csmType != ConsumeType.CONSUME_BAND) {
             retResult.setCheckData(requiredPartMap);
             return retResult;
         }
@@ -200,13 +199,15 @@ public class PBParameterUtils {
             return retResult;
         }
         GroupResCtrlEntity offsetResetGroupEntity =
-                defMetaDataManager.confGetGroupResCtrlConf(inConsumerInfo.getGroup());
+                defMetaDataManager.confGetGroupResCtrlConf(inConsumerInfo.getGroupName());
         if (masterConfig.isStartOffsetResetCheck()) {
             if (offsetResetGroupEntity == null) {
                 retResult.setCheckResult(false,
                         TErrCodeConstants.BAD_REQUEST,
-                        "[unauthorized subscribe] ConsumeGroup must be authorized by administrator before"
-                                + " using bound subscribe, please contact to administrator!");
+                        strBuffer.append("[unauthorized subscribe] ConsumeGroup must be ")
+                                .append("authorized by administrator before using bound subscribe")
+                                .append(", please contact to administrator!").toString());
+                strBuffer.delete(0, strBuffer.length());
                 return retResult;
             }
         }
@@ -232,167 +233,6 @@ public class PBParameterUtils {
         return retResult;
     }
 
-    // #lizard forgives
-    public static ParamCheckResult validConsumerExistInfo(ConsumerInfo inConsumerInfo,
-                                                          boolean isSelectBig,
-                                                          ConsumerBandInfo consumerBandInfo,
-                                                          final StringBuilder strBuffer) throws Exception {
-        // This part is mainly to check whether the newly accessed client is consistent with the existing
-        // consumer consumption target
-        ParamCheckResult retResult = new ParamCheckResult();
-        if (consumerBandInfo == null) {
-            retResult.setCheckData(inConsumerInfo);
-            return retResult;
-        }
-        // check whether the consumer behavior is consistent
-        if (inConsumerInfo.isRequireBound() != consumerBandInfo.isBandConsume()) {
-            if (inConsumerInfo.isRequireBound()) {
-                strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                        .append(" using bound subscribe is inconsistency ")
-                        .append("with other consumers using unbound subscribe in the group");
-            } else {
-                strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                        .append(" using unbound subscribe is inconsistency with other consumers")
-                        .append(" using bound subscribe in the group");
-            }
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.BAD_REQUEST,
-                    strBuffer.toString());
-            logger.warn(strBuffer.toString());
-            return retResult;
-        }
-        // check the topics of consumption
-        List<ConsumerInfo> infoList = consumerBandInfo.getConsumerInfoList();
-        Set<String> existedTopics = consumerBandInfo.getTopicSet();
-        Map<String, TreeSet<String>> existedTopicConditions = consumerBandInfo.getTopicConditions();
-        if (existedTopics != null && !existedTopics.isEmpty()) {
-            if (existedTopics.size() != inConsumerInfo.getTopicSet().size()
-                    || !existedTopics.containsAll(inConsumerInfo.getTopicSet())) {
-
-                retResult.setCheckResult(false,
-                        TErrCodeConstants.BAD_REQUEST,
-                        strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                                .append(" subscribed topics ").append(inConsumerInfo.getTopicSet())
-                                .append(" is inconsistency with other consumers in the group, existedTopics: ")
-                                .append(existedTopics).toString());
-                logger.warn(strBuffer.toString());
-                return retResult;
-            }
-        }
-        if (infoList != null && !infoList.isEmpty()) {
-            boolean isCondEqual = true;
-            if (existedTopicConditions == null || existedTopicConditions.isEmpty()) {
-                if (inConsumerInfo.getTopicConditions().isEmpty()) {
-                    isCondEqual = true;
-                } else {
-                    isCondEqual = false;
-                    strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                            .append(" subscribe with filter condition ")
-                            .append(inConsumerInfo.getTopicConditions())
-                            .append(" is inconsistency with other consumers in the group: topic without conditions");
-                }
-            } else {
-                // check the filter conditions of the topic
-                if (inConsumerInfo.getTopicConditions().isEmpty()) {
-                    isCondEqual = false;
-                    strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                            .append(" subscribe without filter condition ")
-                            .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
-                            .append(existedTopicConditions);
-                } else {
-                    Set<String> existedCondTopics = existedTopicConditions.keySet();
-                    Set<String> reqCondTopics = inConsumerInfo.getTopicConditions().keySet();
-                    if (existedCondTopics.size() != reqCondTopics.size()
-                            || !existedCondTopics.containsAll(reqCondTopics)) {
-                        isCondEqual = false;
-                        strBuffer.append("[Inconsistency subscribe] ")
-                                .append(inConsumerInfo.getConsumerId())
-                                .append(" subscribe with filter condition ")
-                                .append(inConsumerInfo.getTopicConditions())
-                                .append(" is inconsistency with other consumers in the group, ")
-                                .append("existed topic conditions is ")
-                                .append(existedTopicConditions);
-                    } else {
-                        isCondEqual = true;
-                        for (String topicKey : existedCondTopics) {
-                            if ((existedTopicConditions.get(topicKey).size()
-                                    != inConsumerInfo.getTopicConditions().get(topicKey).size())
-                                    || (!existedTopicConditions.get(topicKey).containsAll(inConsumerInfo
-                                    .getTopicConditions().get(topicKey)))) {
-                                isCondEqual = false;
-                                strBuffer.append("[Inconsistency subscribe] ")
-                                        .append(inConsumerInfo.getConsumerId())
-                                        .append(" subscribe with filter condition ")
-                                        .append(inConsumerInfo.getTopicConditions())
-                                        .append(" is inconsistency with other consumers ")
-                                        .append("in the group, existed topic conditions is ")
-                                        .append(existedTopicConditions);
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-            if (!isCondEqual) {
-                retResult.setCheckResult(false,
-                        TErrCodeConstants.BAD_REQUEST,
-                        strBuffer.toString());
-                logger.warn(strBuffer.toString());
-                return retResult;
-            }
-        }
-        if (inConsumerInfo.isRequireBound()) {
-            // If the sessionKey is inconsistent, it means that the previous round of consumption has not completely
-            // exited. In order to avoid the incomplete offset setting, it is necessary to completely clear the above
-            // data before resetting and consuming this round of consumption
-            if (!inConsumerInfo.getSessionKey().equals(consumerBandInfo.getSessionKey())) {
-                strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                        .append("'s sessionKey is inconsistency with other consumers in the group, required is ")
-                        .append(consumerBandInfo.getSessionKey()).append(", request is ")
-                        .append(inConsumerInfo.getSessionKey());
-                retResult.setCheckResult(false,
-                        TErrCodeConstants.BAD_REQUEST,
-                        strBuffer.toString());
-                logger.warn(strBuffer.toString());
-                return retResult;
-            }
-            // check the offset config
-            if (isSelectBig != consumerBandInfo.isSelectedBig()) {
-                strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                        .append("'s isSelectBig is inconsistency with other consumers in the group, required is ")
-                        .append(consumerBandInfo.isSelectedBig())
-                        .append(", request is ").append(isSelectBig);
-                retResult.setCheckResult(false,
-                        TErrCodeConstants.BAD_REQUEST,
-                        strBuffer.toString());
-                logger.warn(strBuffer.toString());
-                return retResult;
-            }
-            // check the consumers count
-            if (inConsumerInfo.getSourceCount() != consumerBandInfo.getSourceCount()) {
-                strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
-                        .append("'s sourceCount is inconsistency with other consumers in the group, required is ")
-                        .append(consumerBandInfo.getSourceCount())
-                        .append(", request is ").append(inConsumerInfo.getSourceCount());
-                retResult.setCheckResult(false,
-                        TErrCodeConstants.BAD_REQUEST,
-                        strBuffer.toString());
-                logger.warn(strBuffer.toString());
-                return retResult;
-            }
-        }
-        boolean registered = false;
-        if (infoList != null) {
-            for (ConsumerInfo info : infoList) {
-                if (info.getConsumerId().equals(inConsumerInfo.getConsumerId())) {
-                    registered = true;
-                }
-            }
-        }
-        retResult.setCheckData(registered);
-        return retResult;
-    }
-
     /**
      * Check the id of broker
      *
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
new file mode 100644
index 0000000..72f0773
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common.utils;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.TokenConstants;
+import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.inlong.tubemq.corebase.cluster.Partition;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+
+public class ClientSyncInfo {
+    private boolean updated = false;
+    private long brokerConfigId = TBaseConstants.META_VALUE_UNDEFINED;
+    private long topicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
+    private long lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
+    private boolean repPartInfo = false;
+    private final Set<Partition> subPartSet = new HashSet<>();
+
+    public ClientSyncInfo() {
+
+    }
+
+    public void updSubRepInfo(BrokerRunManager brokerRunManager,
+                              ClientMaster.ClientSubRepInfo clientSubRepInfo) {
+        if (clientSubRepInfo == null) {
+            return;
+        }
+        if (clientSubRepInfo.hasBrokerConfigId()) {
+            this.brokerConfigId = clientSubRepInfo.getBrokerConfigId();
+            this.updated = true;
+        }
+        if (clientSubRepInfo.hasTopicMetaInfoId()) {
+            this.topicMetaInfoId = clientSubRepInfo.getTopicMetaInfoId();
+            this.updated = true;
+        }
+        if (clientSubRepInfo.hasLstAssignedTime()) {
+            this.lstAssignedTime = clientSubRepInfo.getLstAssignedTime();
+            this.updated = true;
+        }
+        if (clientSubRepInfo.hasReportSubInfo()) {
+            this.repPartInfo = clientSubRepInfo.getReportSubInfo();
+            if (this.repPartInfo) {
+                for (String info : clientSubRepInfo.getPartSubInfoList()) {
+                    if (TStringUtils.isBlank(info)) {
+                        continue;
+                    }
+                    String[] strInfo = info.split(TokenConstants.SEGMENT_SEP);
+                    String[] strPartInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
+                    for (String partStr : strPartInfoSet) {
+                        String[] strPartInfo = partStr.split(TokenConstants.ATTR_SEP);
+                        BrokerInfo brokerInfo =
+                                brokerRunManager.getBrokerInfo(Integer.parseInt(strPartInfo[0]));
+                        if (brokerInfo == null) {
+                            continue;
+                        }
+                        subPartSet.add(new Partition(brokerInfo,
+                                strInfo[0], Integer.parseInt(strPartInfo[1])));
+                    }
+                }
+            }
+            this.updated = true;
+        }
+    }
+
+    public boolean isUpdated() {
+        return updated;
+    }
+
+    public long getBrokerConfigId() {
+        return brokerConfigId;
+    }
+
+    public long getTopicMetaInfoId() {
+        return topicMetaInfoId;
+    }
+
+    public long getLstAssignedTime() {
+        return lstAssignedTime;
+    }
+
+    public Tuple2<Boolean, Set<Partition>> getRepSubInfo() {
+        return new Tuple2<>(this.repPartInfo, this.subPartSet);
+    }
+
+    public void clear() {
+        this.updated = false;
+        this.brokerConfigId = TBaseConstants.META_VALUE_UNDEFINED;
+        this.topicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
+        this.lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
+        this.repPartInfo = false;
+        this.subPartSet.clear();
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index c40a6f1..0bee4de 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -41,7 +41,6 @@ import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
 import org.apache.inlong.tubemq.corebase.balance.EventStatus;
 import org.apache.inlong.tubemq.corebase.balance.EventType;
 import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
 import org.apache.inlong.tubemq.corebase.cluster.NodeAddrInfo;
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
@@ -77,7 +76,6 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.Registe
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2CV2;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2P;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
-import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
@@ -112,8 +110,10 @@ import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHol
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeType;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerEventManager;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeproducer.ProducerInfoHolder;
 import org.apache.inlong.tubemq.server.master.utils.Chore;
@@ -178,7 +178,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
                 false, TBaseConstants.META_VALUE_UNDEFINED);
         this.producerHolder = new ProducerInfoHolder();
-        this.consumerHolder = new ConsumerInfoHolder();
+        this.consumerHolder = new ConsumerInfoHolder(this.masterConfig);
         this.consumerEventManager = new ConsumerEventManager(consumerHolder);
         this.topicPSInfoManager = new TopicPSInfoManager(this);
         this.loadBalancer = new DefaultLoadBalancer();
@@ -551,9 +551,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
         String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : "";
-        boolean isReqConsumeBand = (request.hasRequireBound() && request.getRequireBound());
+        ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound())
+                ? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL;
         final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
-        paramCheckResult = PBParameterUtils.checkConsumerOffsetSetInfo(isReqConsumeBand,
+        paramCheckResult = PBParameterUtils.checkConsumerOffsetSetInfo(csmType,
                 reqTopicSet, requiredParts, strBuffer);
         if (!paramCheckResult.result) {
             builder.setErrCode(paramCheckResult.errCode);
@@ -568,13 +569,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 ? request.getSessionTime() : System.currentTimeMillis();
         int sourceCount = request.hasTotalCount()
                 ? request.getTotalCount() : -1;
-        int reqQryPriorityId = request.hasQryPriorityId()
+        int qryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
+        List<SubscribeInfo> subscribeList =
+                DataConverterUtil.convertSubInfo(request.getSubscribeInfoList());
+        boolean isNotAllocated = true;
+        if (CollectionUtils.isNotEmpty(subscribeList)
+                || ((request.hasNotAllocated() && !request.getNotAllocated()))) {
+            isNotAllocated = false;
+        }
         boolean isSelectBig = (!request.hasSelectBig() || request.getSelectBig());
+        // build consumer object
         ConsumerInfo inConsumerInfo =
                 new ConsumerInfo(consumerId, overtls, groupName,
-                        reqTopicSet, reqTopicConditions, isReqConsumeBand,
-                        sessionKey, sessionTime, sourceCount, requiredPartMap);
+                        reqTopicSet, reqTopicConditions, csmType,
+                        sessionKey, sessionTime, sourceCount, isSelectBig, requiredPartMap);
         paramCheckResult =
                 PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
                         masterConfig, defMetaDataManager, brokerRunManager, strBuffer);
@@ -606,38 +615,17 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         // need removed for authorize center end
         Integer lid = null;
+        ConsumeGroupInfo consumeGroupInfo = null;
         try {
             lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true);
-            ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(groupName);
-            paramCheckResult =
-                    PBParameterUtils.validConsumerExistInfo(inConsumerInfo2, isSelectBig, consumerBandInfo, strBuffer);
-            if (!paramCheckResult.result) {
+            if (!consumerHolder.addConsumer(inConsumerInfo2,
+                    isNotAllocated, strBuffer, paramCheckResult)) {
                 builder.setErrCode(paramCheckResult.errCode);
                 builder.setErrMsg(paramCheckResult.errMsg);
                 return builder.build();
             }
-            boolean registered = (consumerBandInfo != null) && (boolean) paramCheckResult.checkData;
-            List<SubscribeInfo> subscribeList =
-                    DataConverterUtil.convertSubInfo(request.getSubscribeInfoList());
-            boolean isNotAllocated = true;
-            if (CollectionUtils.isNotEmpty(subscribeList)
-                    || ((request.hasNotAllocated() && !request.getNotAllocated()))) {
-                isNotAllocated = false;
-            }
-            if ((consumerBandInfo != null && consumerBandInfo.getConsumerInfoList() == null) || !registered) {
-                consumerHolder.addConsumer(inConsumerInfo2, isNotAllocated, isSelectBig);
-            }
-            for (String topic : reqTopicSet) {
-                ConcurrentHashSet<String> groupSet =
-                        topicPSInfoManager.getTopicSubInfo(topic);
-                if (groupSet == null) {
-                    groupSet = new ConcurrentHashSet<>();
-                    topicPSInfoManager.setTopicSubInfo(topic, groupSet);
-                }
-                if (!groupSet.contains(groupName)) {
-                    groupSet.add(groupName);
-                }
-            }
+            consumeGroupInfo = (ConsumeGroupInfo) paramCheckResult.checkData;
+            topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
             if (CollectionUtils.isNotEmpty(subscribeList)) {
                 Map<String, Map<String, Partition>> topicPartSubMap =
                         new HashMap<>();
@@ -663,8 +651,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
         }
         logger.info(strBuffer.append("[Consumer Register] ")
-            .append(consumerId).append(", isOverTLS=").append(overtls)
-            .append(", clientJDKVer=").append(clientJdkVer).toString());
+                .append(consumerId).append(", isOverTLS=").append(overtls)
+                .append(", clientJDKVer=").append(clientJdkVer).toString());
         strBuffer.delete(0, strBuffer.length());
         if (request.hasDefFlowCheckId() || request.hasGroupFlowCheckId()) {
             builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
@@ -693,7 +681,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
         }
         builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
-        builder.setNotAllocated(consumerHolder.isNotAllocated(groupName));
+        builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
         builder.setSuccess(true);
         builder.setErrCode(TErrCodeConstants.SUCCESS);
         builder.setErrMsg("OK!");
@@ -742,8 +730,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         final String groupName = (String) paramCheckResult.checkData;
         checkNodeStatus(clientId, strBuffer);
-        ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(groupName);
-        if (consumerBandInfo == null) {
+        ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
+        if (consumeGroupInfo == null) {
             builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
             builder.setErrMsg(strBuffer.append("Not found groupName ")
                     .append(groupName).append(" in holder!").toString());
@@ -752,7 +740,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         // authorize check
         CertifiedResult authorizeResult =
                 serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
-                        groupName, consumerBandInfo.getTopicSet(), consumerBandInfo.getTopicConditions(), rmtAddress);
+                        groupName, consumeGroupInfo.getTopicSet(), consumeGroupInfo.getTopicConditions(), rmtAddress);
         if (!authorizeResult.result) {
             builder.setErrCode(authorizeResult.errCode);
             builder.setErrMsg(authorizeResult.errInfo);
@@ -816,7 +804,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             logger.info(processedEvent.toStrBuilder(strBuffer).toString());
             strBuffer.delete(0, strBuffer.length());
             try {
-                consumerHolder.setAllocated(groupName);
+                consumeGroupInfo.settAllocated();
                 consumerEventManager.removeFirst(clientId);
             } catch (Throwable e) {
                 logger.warn("Unknown exception for remove first event:", e);
@@ -868,7 +856,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
         }
         builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
-        builder.setNotAllocated(consumerHolder.isNotAllocated(groupName));
+        builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
         builder.setSuccess(true);
         builder.setErrCode(TErrCodeConstants.SUCCESS);
         builder.setErrMsg("OK!");
@@ -1289,8 +1277,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             return;
         }
         final boolean isStartBalance = startupBalance;
-        List<String> groupsNeedToBalance =
-                isStartBalance ? consumerHolder.getAllGroup() : getNeedToBalanceGroupList(strBuffer);
+        List<String> groupsNeedToBalance = isStartBalance
+                ? consumerHolder.getAllServerBalanceGroups() : getNeedToBalanceGroups(strBuffer);
         strBuffer.delete(0, strBuffer.length());
         if (!groupsNeedToBalance.isEmpty()) {
             // set parallel rebalance signal
@@ -1360,12 +1348,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         // choose different load balance strategy
         if (isFirstReb) {
             finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
-                    brokerRunManager, groups, defMetaDataManager,
-                    masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
+                    brokerRunManager, groups, defMetaDataManager, strBuffer);
         } else {
             finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
-                    consumerHolder, brokerRunManager, groups, defMetaDataManager,
-                    masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
+                    consumerHolder, brokerRunManager, groups, defMetaDataManager, strBuffer);
         }
         // allocate partitions to consumers
         for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
@@ -1376,15 +1362,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             if (consumerId == null) {
                 continue;
             }
-            Tuple2<String, ConsumerInfo> tupleInfo =
-                    consumerHolder.getConsumeTupleInfo(consumerId);
-            if (tupleInfo == null
-                    || tupleInfo.getF0() == null
-                    || tupleInfo.getF1() == null) {
+            ConsumerInfo consumerInfo =
+                    consumerHolder.getConsumerInfo(consumerId);
+            if (consumerInfo == null) {
                 continue;
             }
             Set<String> blackTopicSet =
-                    defMetaDataManager.getDisableConsumeTopicByGroupName(tupleInfo.getF0());
+                    defMetaDataManager.getDisableTopicByGroupName(consumerInfo.getGroupName());
             Map<String, List<Partition>> topicSubPartMap = entry.getValue();
             List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
             List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1405,7 +1389,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         currentPartMap = new HashMap<>();
                     }
                 }
-                if (tupleInfo.getF1().isOverTLS()) {
+                if (consumerInfo.isOverTLS()) {
                     for (Partition currentPart : currentPartMap.values()) {
                         if (!blackTopicSet.contains(currentPart.getTopic())) {
                             boolean found = false;
@@ -1421,8 +1405,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                             }
                         }
                         deletedSubInfoList
-                                .add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
-                                        tupleInfo.getF1().isOverTLS(), currentPart));
+                                .add(new SubscribeInfo(consumerId, consumerInfo.getGroupName(),
+                                        consumerInfo.isOverTLS(), currentPart));
                     }
                     for (Partition finalPart : finalPartList) {
                         if (!blackTopicSet.contains(finalPart.getTopic())) {
@@ -1438,7 +1422,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                                 continue;
                             }
                             addedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    tupleInfo.getF0(), true, finalPart));
+                                    consumerInfo.getGroupName(), true, finalPart));
                         }
                     }
                 } else {
@@ -1446,14 +1430,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         if ((blackTopicSet.contains(currentPart.getTopic()))
                                 || (!finalPartList.contains(currentPart))) {
                             deletedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    tupleInfo.getF0(), false, currentPart));
+                                    consumerInfo.getGroupName(), false, currentPart));
                         }
                     }
                     for (Partition finalPart : finalPartList) {
                         if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
                                 && (!blackTopicSet.contains(finalPart.getTopic()))) {
                             addedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    tupleInfo.getF0(), false, finalPart));
+                                    consumerInfo.getGroupName(), false, finalPart));
                         }
                     }
                 }
@@ -1516,16 +1500,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             if (consumerId == null) {
                 continue;
             }
-            Tuple2<String, ConsumerInfo> tupleInfo =
-                    consumerHolder.getConsumeTupleInfo(consumerId);
-            if (tupleInfo == null
-                    || tupleInfo.getF0() == null
-                    || tupleInfo.getF1() == null) {
+            ConsumerInfo consumerInfo =
+                    consumerHolder.getConsumerInfo(consumerId);
+            if (consumerInfo == null) {
                 continue;
             }
             // allocate partitions to consumers
             Set<String> blackTopicSet =
-                    defMetaDataManager.getDisableConsumeTopicByGroupName(tupleInfo.getF0());
+                    defMetaDataManager.getDisableTopicByGroupName(consumerInfo.getGroupName());
             Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
             List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
             List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1552,15 +1534,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     if ((blackTopicSet.contains(currentPart.getTopic()))
                             || (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
                         deletedSubInfoList
-                                .add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
-                                        tupleInfo.getF1().isOverTLS(), currentPart));
+                                .add(new SubscribeInfo(consumerId, consumerInfo.getGroupName(),
+                                        consumerInfo.isOverTLS(), currentPart));
                     }
                 }
                 for (Partition finalPart : finalPartMap.values()) {
                     if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
                             && (!blackTopicSet.contains(finalPart.getTopic()))) {
-                        addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
-                                tupleInfo.getF1().isOverTLS(), finalPart));
+                        addedSubInfoList.add(new SubscribeInfo(consumerId,
+                                consumerInfo.getGroupName(), consumerInfo.isOverTLS(), finalPart));
                     }
                 }
             }
@@ -1665,7 +1647,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * @param strBuffer
      * @return
      */
-    private List<String> getNeedToBalanceGroupList(final StringBuilder strBuffer) {
+    private List<String> getNeedToBalanceGroups(final StringBuilder strBuffer) {
         List<String> groupsNeedToBalance = new ArrayList<>();
         Set<String> groupHasUnfinishedEvent = new HashSet<>();
         if (consumerEventManager.hasEvent()) {
@@ -1677,7 +1659,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 if (consumerId == null) {
                     continue;
                 }
-                String group = consumerHolder.getGroup(consumerId);
+                String group = consumerHolder.getGroupName(consumerId);
                 if (group == null) {
                     continue;
                 }
@@ -1698,7 +1680,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
         }
         consumerEventManager.updateUnfinishedCountMap(groupHasUnfinishedEvent);
-        List<String> allGroups = consumerHolder.getAllGroup();
+        List<String> allGroups = consumerHolder.getAllServerBalanceGroups();
         if (groupHasUnfinishedEvent.isEmpty()) {
             for (String group : allGroups) {
                 if (group != null) {
@@ -1899,18 +1881,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             String group = nodeStrs[1];
             Integer lid = null;
             try {
-                lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true);
+                lid = masterRowLock.getLock(null,
+                        StringUtils.getBytesUtf8(consumerId), true);
                 ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId);
                 currentSubInfo.remove(consumerId);
                 consumerEventManager.removeAll(consumerId);
-                List<ConsumerInfo> consumerList =
-                        consumerHolder.getConsumerList(group);
-                if (consumerList == null || consumerList.isEmpty()) {
-                    if (info != null) {
-                        for (String topic : info.getTopicSet()) {
-                            topicPSInfoManager.removeTopicSubInfo(topic, group);
-                        }
-                    }
+                if (info != null && consumerHolder.isConsumeGroupEmpty(group)) {
+                    topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
                 }
             } catch (IOException e) {
                 logger.warn("Failed to lock.", e);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
index c8bfdab..39e4830 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -30,16 +30,13 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.tubemq.corebase.TokenConstants;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
-import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorageInfo;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.RebProcessInfo;
@@ -63,7 +60,6 @@ public class DefaultLoadBalancer implements LoadBalancer {
      * @param brokerRunManager
      * @param groupSet
      * @param metaDataManager
-     * @param defAllowBClientRate
      * @param strBuffer
      * @return
      */
@@ -74,36 +70,31 @@ public class DefaultLoadBalancer implements LoadBalancer {
             BrokerRunManager brokerRunManager,
             List<String> groupSet,
             MetaDataManager metaDataManager,
-            int defAllowBClientRate,
             StringBuilder strBuffer) {
         // #lizard forgives
         // load balance according to group
         Map<String/* consumer */,
                 Map<String/* topic */, List<Partition>>> finalSubInfoMap =
                 new HashMap<>();
-        Map<String, RebProcessInfo> rejGroupClientINfoMap = new HashMap<>();
+        Map<String, RebProcessInfo> rejGroupClientInfoMap = new HashMap<>();
         Set<String> onlineOfflineGroupSet = new HashSet<>();
-        Set<String> bandGroupSet = new HashSet<>();
+        Set<String> boundGroupSet = new HashSet<>();
         for (String group : groupSet) {
             if (group == null) {
                 continue;
             }
-            ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
-            if (consumerBandInfo == null) {
+            ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(group);
+            if (consumeGroupInfo == null || consumeGroupInfo.isClientBalance()) {
                 continue;
             }
-            List<ConsumerInfo> consumerList = consumerBandInfo.getConsumerInfoList();
+            List<ConsumerInfo> consumerList =
+                    consumeGroupInfo.getConsumerInfoList();
             if (CollectionUtils.isEmpty(consumerList)) {
                 continue;
             }
-            // deal with regular consumer allocation, band consume allocation not in this part
-            Map<String, String> partsConsumerMap =
-                    consumerBandInfo.getPartitionInfoMap();
-            if (consumerBandInfo.isBandConsume()
-                    && consumerBandInfo.isNotAllocate()
-                    && !partsConsumerMap.isEmpty()
-                    && consumerBandInfo.getAllocatedTimes() < 2) {
-                bandGroupSet.add(group);
+            // deal with regular consumer allocation, bound consume not in this part
+            if (consumeGroupInfo.isUnReadyServerBalance()) {
+                boundGroupSet.add(group);
                 continue;
             }
             List<ConsumerInfo> newConsumerList = new ArrayList<>();
@@ -115,9 +106,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
             if (CollectionUtils.isEmpty(newConsumerList)) {
                 continue;
             }
-            Set<String> topicSet = consumerBandInfo.getTopicSet();
-            if (!consumerBandInfo.isBandConsume()
-                    && consumerBandInfo.getRebalanceCheckStatus() <= 0) {
+            Set<String> topicSet = consumeGroupInfo.getTopicSet();
+            if (consumeGroupInfo.needResourceCheck()) {
                 // check if current client meet minimal requirements
                 GroupResCtrlEntity offsetResetGroupEntity =
                         metaDataManager.confGetGroupResCtrlConf(group);
@@ -125,7 +115,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                         && offsetResetGroupEntity.getAllowedBrokerClientRate() > 0)
                         ? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
                 int allowRate = confAllowBClientRate > 0
-                        ? confAllowBClientRate : defAllowBClientRate;
+                        ? confAllowBClientRate : consumerHolder.getDefResourceRate();
                 int maxBrokerCount =
                         brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
                 int curBClientRate = (int) Math.floor(maxBrokerCount / newConsumerList.size());
@@ -134,33 +124,33 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     if (maxBrokerCount % allowRate != 0) {
                         minClientCnt += 1;
                     }
-                    consumerHolder.setCurConsumeBClientInfo(group, defAllowBClientRate,
-                            confAllowBClientRate, curBClientRate, minClientCnt, false);
-                    if (consumerBandInfo.isRebalanceCheckPrint()) {
+                    consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
+                            curBClientRate, minClientCnt, false);
+                    if (consumeGroupInfo.isEnableBalanceChkPrint()) {
                         logger.info(strBuffer.append("[UnBound Alloc 2] Not allocate partition :group(")
                                 .append(group).append(")'s consumer getCachedSize(")
-                                .append(consumerBandInfo.getGroupCnt())
+                                .append(consumeGroupInfo.getGroupCnt())
                                 .append(") low than min required client count:")
                                 .append(minClientCnt).toString());
                         strBuffer.delete(0, strBuffer.length());
                     }
                     continue;
                 } else {
-                    consumerHolder.setCurConsumeBClientInfo(group,
-                            defAllowBClientRate, confAllowBClientRate, curBClientRate, -2, true);
+                    consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
+                            curBClientRate, -2, true);
                 }
             }
             RebProcessInfo rebProcessInfo = new RebProcessInfo();
-            if (!consumerBandInfo.isRebalanceMapEmpty()) {
+            if (!consumeGroupInfo.isBalanceMapEmpty()) {
                 rebProcessInfo = consumerHolder.getNeedRebNodeList(group);
                 if (!rebProcessInfo.isProcessInfoEmpty()) {
-                    rejGroupClientINfoMap.put(group, rebProcessInfo);
+                    rejGroupClientInfoMap.put(group, rebProcessInfo);
                 }
             }
             List<ConsumerInfo> newConsumerList2 = new ArrayList<>();
             Map<String, Partition> partMap =
                     brokerRunManager.getSubBrokerAcceptSubParts(topicSet);
-            Map<String, NodeRebInfo> rebProcessInfoMap = consumerBandInfo.getRebalanceMap();
+            Map<String, NodeRebInfo> rebProcessInfoMap = consumeGroupInfo.getBalanceMap();
             for (ConsumerInfo consumer : newConsumerList) {
                 Map<String, List<Partition>> partitions = new HashMap<>();
                 finalSubInfoMap.put(consumer.getConsumerId(), partitions);
@@ -176,7 +166,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                             newConsumerList2.add(consumer);
                         }
                         for (Entry<String, Map<String, Partition>> entry : relation.entrySet()) {
-                            partitions.put(entry.getKey(), new ArrayList<Partition>());
+                            partitions.put(entry.getKey(), new ArrayList<>());
                         }
                         continue;
                     }
@@ -223,19 +213,18 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 }
             }
         }
-        if (!bandGroupSet.isEmpty()) {
-            for (String group : bandGroupSet) {
+        if (!boundGroupSet.isEmpty()) {
+            for (String group : boundGroupSet) {
                 groupsNeedToBalance.remove(group);
             }
         }
         if (!groupsNeedToBalance.isEmpty()) {
-            finalSubInfoMap =
-                    balance(finalSubInfoMap, consumerHolder, brokerRunManager,
-                            groupsNeedToBalance, clusterState, rejGroupClientINfoMap);
+            balance(finalSubInfoMap, consumerHolder, brokerRunManager,
+                    groupsNeedToBalance, clusterState, rejGroupClientInfoMap);
         }
-        if (!rejGroupClientINfoMap.isEmpty()) {
+        if (!rejGroupClientInfoMap.isEmpty()) {
             for (Entry<String, RebProcessInfo> entry :
-                    rejGroupClientINfoMap.entrySet()) {
+                    rejGroupClientInfoMap.entrySet()) {
                 consumerHolder.setRebNodeProcessed(entry.getKey(),
                         entry.getValue().needProcessList);
             }
@@ -244,7 +233,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
     }
 
     // #lizard forgives
-    private Map<String, Map<String, List<Partition>>> balance(
+    private void balance(
             Map<String, Map<String, List<Partition>>> clusterState,
             ConsumerInfoHolder consumerHolder,
             BrokerRunManager brokerRunManager,
@@ -253,24 +242,24 @@ public class DefaultLoadBalancer implements LoadBalancer {
             Map<String, RebProcessInfo> rejGroupClientInfoMap) {
         // according to group
         for (String group : groupSet) {
-            ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
-            if (consumerBandInfo == null) {
+            ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(group);
+            if (consumeGroupInfo == null || consumeGroupInfo.isClientBalance()) {
                 continue;
             }
             // filter consumer which don't need to handle
             List<ConsumerInfo> consumerList = new ArrayList<>();
-            List<ConsumerInfo> consumerList1 = consumerBandInfo.getConsumerInfoList();
+            List<ConsumerInfo> consumerList1 = consumeGroupInfo.getConsumerInfoList();
             RebProcessInfo rebProcessInfo = rejGroupClientInfoMap.get(group);
             if (rebProcessInfo != null) {
                 for (ConsumerInfo consumerInfo : consumerList1) {
+                    if (consumerInfo == null) {
+                        continue;
+                    }
                     if (rebProcessInfo.needProcessList.contains(consumerInfo.getConsumerId())
                             || rebProcessInfo.needEscapeList.contains(consumerInfo.getConsumerId())) {
                         Map<String, List<Partition>> partitions2 =
-                                clusterState.get(consumerInfo.getConsumerId());
-                        if (partitions2 == null) {
-                            partitions2 = new HashMap<>();
-                            clusterState.put(consumerInfo.getConsumerId(), partitions2);
-                        }
+                                clusterState.computeIfAbsent(
+                                        consumerInfo.getConsumerId(), k -> new HashMap<>());
                         Map<String, Map<String, Partition>> relation =
                                 oldClusterState.get(consumerInfo.getConsumerId());
                         if (relation != null) {
@@ -289,7 +278,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 continue;
             }
             // sort consumer and partitions, then mod
-            Set<String> topics = consumerBandInfo.getTopicSet();
+            Set<String> topics = consumeGroupInfo.getTopicSet();
             Map<String, Partition> psPartMap =
                     brokerRunManager.getSubBrokerAcceptSubParts(topics);
             int min = psPartMap.size() / consumerList.size();
@@ -359,24 +348,16 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     assign(partitionToMove.poll(), clusterState, consumerId);
                 }
             }
-
         }
-        return clusterState;
     }
 
     private void assign(Partition partition,
                         Map<String, Map<String, List<Partition>>> clusterState,
                         String consumerId) {
-        Map<String, List<Partition>> partitions = clusterState.get(consumerId);
-        if (partitions == null) {
-            partitions = new HashMap<>();
-            clusterState.put(consumerId, partitions);
-        }
-        List<Partition> ps = partitions.get(partition.getTopic());
-        if (ps == null) {
-            ps = new ArrayList<>();
-            partitions.put(partition.getTopic(), ps);
-        }
+        Map<String, List<Partition>> partitions =
+                clusterState.computeIfAbsent(consumerId, k -> new HashMap<>());
+        List<Partition> ps = partitions.computeIfAbsent(
+                partition.getTopic(), k -> new ArrayList<>());
         ps.add(partition);
     }
 
@@ -424,16 +405,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
             if (searched == 0) {
                 return;
             }
-            Map<String, List<Partition>> partitions = clusterState.get(consumer.getConsumerId());
-            if (partitions == null) {
-                partitions = new HashMap<>();
-                clusterState.put(consumer.getConsumerId(), partitions);
-            }
-            List<Partition> ps = partitions.get(partition.getTopic());
-            if (ps == null) {
-                ps = new ArrayList<>();
-                partitions.put(partition.getTopic(), ps);
-            }
+            Map<String, List<Partition>> partitions =
+                    clusterState.computeIfAbsent(consumer.getConsumerId(), k -> new HashMap<>());
+            List<Partition> ps = partitions.computeIfAbsent(
+                    partition.getTopic(), k -> new ArrayList<>());
             ps.add(partition);
         }
     }
@@ -489,7 +464,6 @@ public class DefaultLoadBalancer implements LoadBalancer {
      * @param brokerRunManager
      * @param groupSet
      * @param metaDataManager
-     * @param defAllowBClientRate
      * @param strBuffer
      * @return
      */
@@ -499,40 +473,32 @@ public class DefaultLoadBalancer implements LoadBalancer {
             BrokerRunManager brokerRunManager,
             List<String> groupSet,
             MetaDataManager metaDataManager,
-            int defAllowBClientRate,
             StringBuilder strBuffer) {
         // #lizard forgives
         // regular consumer allocate operation
         Map<String, Map<String, List<Partition>>> finalSubInfoMap =
                 new HashMap<>();
         for (String group : groupSet) {
-            ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
-            // filter empty consumer
-            if (consumerBandInfo == null) {
+            ConsumeGroupInfo consumeGroupInfo =
+                    consumerHolder.getConsumeGroupInfo(group);
+            if (consumeGroupInfo == null
+                    || consumeGroupInfo.isClientBalance()
+                    || consumeGroupInfo.isUnReadyServerBalance()) {
                 continue;
             }
-
-            List<ConsumerInfo> consumerList = consumerBandInfo.getConsumerInfoList();
+            List<ConsumerInfo> consumerList = consumeGroupInfo.getConsumerInfoList();
             if (CollectionUtils.isEmpty(consumerList)) {
                 continue;
             }
-            Map<String, String> partsConsumerMap =
-                    consumerBandInfo.getPartitionInfoMap();
-            if (consumerBandInfo.isBandConsume()
-                    && consumerBandInfo.isNotAllocate()
-                    && !partsConsumerMap.isEmpty()
-                    && consumerBandInfo.getAllocatedTimes() < 2) {
-                continue;
-            }
             // check if current client meet minimal requirements
-            Set<String> topicSet = consumerBandInfo.getTopicSet();
+            Set<String> topicSet = consumeGroupInfo.getTopicSet();
             GroupResCtrlEntity offsetResetGroupEntity =
                     metaDataManager.confGetGroupResCtrlConf(group);
             int confAllowBClientRate = (offsetResetGroupEntity != null
                     && offsetResetGroupEntity.getAllowedBrokerClientRate() > 0)
                     ? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
             int allowRate = confAllowBClientRate > 0
-                    ? confAllowBClientRate : defAllowBClientRate;
+                    ? confAllowBClientRate : consumerHolder.getDefResourceRate();
             int maxBrokerCount =
                     brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
             int curBClientRate = (int) Math.floor(maxBrokerCount / consumerList.size());
@@ -541,21 +507,20 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 if (maxBrokerCount % allowRate != 0) {
                     minClientCnt += 1;
                 }
-                consumerHolder.setCurConsumeBClientInfo(group,
-                        defAllowBClientRate, confAllowBClientRate,
+                consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
                         curBClientRate, minClientCnt, false);
-                if (consumerBandInfo.isRebalanceCheckPrint()) {
+                if (consumeGroupInfo.isEnableBalanceChkPrint()) {
                     logger.info(strBuffer.append("[UnBound Alloc 1] Not allocate partition :group(")
                             .append(group).append(")'s consumer getCachedSize(")
-                            .append(consumerBandInfo.getGroupCnt())
+                            .append(consumeGroupInfo.getGroupCnt())
                             .append(") low than min required client count:")
                             .append(minClientCnt).toString());
                     strBuffer.delete(0, strBuffer.length());
                 }
                 continue;
             } else {
-                consumerHolder.setCurConsumeBClientInfo(group,
-                        defAllowBClientRate, confAllowBClientRate, curBClientRate, -2, true);
+                consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
+                        curBClientRate, -2, true);
             }
             // sort and mod
             Collections.sort(consumerList);
@@ -568,16 +533,9 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 for (int i = 0; i < consumerList.size(); i++) {
                     String consumerId = consumerList.get(i).getConsumerId();
                     Map<String, List<Partition>> topicSubPartMap =
-                            finalSubInfoMap.get(consumerId);
-                    if (topicSubPartMap == null) {
-                        topicSubPartMap = new HashMap<>();
-                        finalSubInfoMap.put(consumerId, topicSubPartMap);
-                    }
-                    List<Partition> partList = topicSubPartMap.get(topic);
-                    if (partList == null) {
-                        partList = new ArrayList<>();
-                        topicSubPartMap.put(topic, partList);
-                    }
+                            finalSubInfoMap.computeIfAbsent(consumerId, k -> new HashMap<>());
+                    List<Partition> partList =
+                            topicSubPartMap.computeIfAbsent(topic, k -> new ArrayList<>());
                     int startIndex = partsPerConsumer * i + Math.min(i, consumersWithExtraPart);
                     int parts = partsPerConsumer + ((i + 1) > consumersWithExtraPart ? 0 : 1);
                     for (int j = startIndex; j < startIndex + parts; j++) {
@@ -650,32 +608,23 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 continue;
             }
             // filter band consumer
-            ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
-            if (consumerBandInfo == null) {
+            ConsumeGroupInfo consumeGroupInfo =
+                    consumerHolder.getConsumeGroupInfo(group);
+            if (consumeGroupInfo == null
+                    || consumeGroupInfo.isGroupEmpty()
+                    || consumeGroupInfo.isNotNeedBoundBalance()) {
                 continue;
             }
-            List<ConsumerInfo> consumerList = consumerBandInfo.getConsumerInfoList();
-            if (CollectionUtils.isEmpty(consumerList)) {
-                continue;
-            }
-            Map<String, String> partsConsumerMap =
-                    consumerBandInfo.getPartitionInfoMap();
-            if (!consumerBandInfo.isBandConsume()
-                    || !consumerBandInfo.isNotAllocate()
-                    || partsConsumerMap.isEmpty()
-                    || consumerBandInfo.getAllocatedTimes() >= 2) {
-                continue;
-            }
-            if (!consumerBandInfo.isGroupFullSize()) {
+            if (!consumeGroupInfo.isGroupFullSize()) {
                 // check if client size meet minimal requirements
                 Long checkCycle = consumerHolder.addCurCheckCycle(group);
                 if (isResetRebalance) {
                     if (checkCycle != null && checkCycle % 15 == 0) {
                         logger.info(strBuffer.append("[Bound Alloc 2] Not allocate partition :group(")
                                 .append(group).append(")'s consumer getCachedSize(")
-                                .append(consumerBandInfo.getGroupCnt())
+                                .append(consumeGroupInfo.getGroupCnt())
                                 .append(") low than required source count:")
-                                .append(consumerBandInfo.getSourceCount())
+                                .append(consumeGroupInfo.getSourceCount())
                                 .append(", checked cycle is ")
                                 .append(checkCycle).toString());
                         strBuffer.delete(0, strBuffer.length());
@@ -683,57 +632,35 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 } else {
                     logger.info(strBuffer.append("[Bound Alloc 1] Not allocate partition :group(")
                             .append(group).append(")'s consumer getCachedSize(")
-                            .append(consumerBandInfo.getGroupCnt())
+                            .append(consumeGroupInfo.getGroupCnt())
                             .append(") low than required source count:")
-                            .append(consumerBandInfo.getSourceCount()).toString());
+                            .append(consumeGroupInfo.getSourceCount()).toString());
                     strBuffer.delete(0, strBuffer.length());
                 }
                 continue;
             }
-            // actual reset offset
-            Map<String, Long> partsOffsetMap = consumerBandInfo.getPartOffsetMap();
-            List<OffsetStorageInfo> offsetInfoList = new ArrayList<>();
-            Map<String, Partition> partitionMap =
-                    brokerRunManager.getSubBrokerAcceptSubParts(consumerBandInfo.getTopicSet());
+            Map<String, Partition> partPubMap =
+                    brokerRunManager.getSubBrokerAcceptSubParts(consumeGroupInfo.getTopicSet());
+            Map<String, Partition> partitionMap = new HashMap<>();
+            for (Partition partition : partPubMap.values()) {
+                partitionMap.put(partition.getPartitionKey(), partition);
+            }
+            Map<String, String> partsConsumerMap =
+                    consumeGroupInfo.getPartitionInfoMap();
             for (Entry<String, String> entry : partsConsumerMap.entrySet()) {
                 Partition foundPart = partitionMap.get(entry.getKey());
                 if (foundPart != null) {
-                    if (partsOffsetMap.get(entry.getKey()) != null) {
-                        offsetInfoList.add(new OffsetStorageInfo(foundPart.getTopic(),
-                                foundPart.getBroker().getBrokerId(),
-                                foundPart.getPartitionId(),
-                                partsOffsetMap.get(entry.getKey()), 0));
-                    }
                     String consumerId = entry.getValue();
                     Map<String, Map<String, Partition>> topicSubPartMap =
-                            finalSubInfoMap.get(consumerId);
-                    if (topicSubPartMap == null) {
-                        topicSubPartMap = new HashMap<>();
-                        finalSubInfoMap.put(consumerId, topicSubPartMap);
-                    }
-                    Map<String, Partition> partMap = topicSubPartMap.get(foundPart.getTopic());
-                    if (partMap == null) {
-                        partMap = new HashMap<>();
-                        topicSubPartMap.put(foundPart.getTopic(), partMap);
-                    }
+                            finalSubInfoMap.computeIfAbsent(consumerId, k -> new HashMap<>());
+                    Map<String, Partition> partMap =
+                            topicSubPartMap.computeIfAbsent(
+                                    foundPart.getTopic(), k -> new HashMap<>());
                     partMap.put(foundPart.getPartitionKey(), foundPart);
                     partitionMap.remove(entry.getKey());
-                } else {
-                    String[] partitionKeyItems = entry.getKey().split(TokenConstants.ATTR_SEP);
-                    BrokerConfEntity brokerConfEntity =
-                            metaDataManager.getBrokerConfByBrokerId(Integer.parseInt(partitionKeyItems[0]));
-                    if (brokerConfEntity != null) {
-                        if (partsOffsetMap.get(entry.getKey()) != null) {
-                            offsetInfoList.add(new OffsetStorageInfo(partitionKeyItems[1],
-                                    brokerConfEntity.getBrokerId(),
-                                    Integer.parseInt(partitionKeyItems[2]),
-                                    partsOffsetMap.get(entry.getKey()), 0));
-                        }
-                    }
                 }
             }
-            zkOffsetStorage.commitOffset(group, offsetInfoList, false);
-            consumerHolder.addAllocatedTimes(group);
+            consumeGroupInfo.addAllocatedTimes();
         }
         return finalSubInfoMap;
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java
index e9f3983..b34faf9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java
@@ -19,11 +19,12 @@ package org.apache.inlong.tubemq.server.master.balance;
 
 import java.util.List;
 import java.util.Map;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
+
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 
 public interface LoadBalancer {
@@ -34,7 +35,6 @@ public interface LoadBalancer {
             BrokerRunManager brokerRunManager,
             List<String> groups,
             MetaDataManager metaDataManager,
-            int defAllowBClientRate,
             StringBuilder sBuilder);
 
     Map<String, Map<String, Map<String, Partition>>> resetBalanceCluster(
@@ -50,7 +50,6 @@ public interface LoadBalancer {
                                                         BrokerRunManager brokerRunManager,
                                                         List<String> groups,
                                                         MetaDataManager metaDataManager,
-                                                        int defAllowBClientRate,
                                                         StringBuilder sBuilder);
 
     Map<String, Map<String, Map<String, Partition>>> resetBukAssign(ConsumerInfoHolder consumerHolder,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index b68cd99..9ae2a42 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -1968,7 +1968,7 @@ public class MetaDataManager implements Server {
         return metaStoreService.getConsumeCtrlByTopicName(topicName);
     }
 
-    public Set<String> getDisableConsumeTopicByGroupName(String groupName) {
+    public Set<String> getDisableTopicByGroupName(String groupName) {
         Set<String> disTopicSet = new HashSet<>();
         List<GroupConsumeCtrlEntity> qryResult =
                 metaStoreService.getConsumeCtrlByGroupName(groupName);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 56137a1..f646be9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -48,35 +48,52 @@ public class TopicPSInfoManager {
      * @param topic  query topic
      * @return  query result
      */
-    public ConcurrentHashSet<String> getTopicSubInfo(String topic) {
+    public Set<String> getTopicSubInfo(String topic) {
         return topicSubInfoMap.get(topic);
     }
 
     /**
-     * Set groups for a topic
+     * Add group subscribe topic info
      *
-     * @param topic     topic target
-     * @param groupSet  group subscribed
+     * @param groupName    the group name
+     * @param topicSet     the topic set which the group subscribed
      */
-    public void setTopicSubInfo(String topic,
-                                ConcurrentHashSet<String> groupSet) {
-        topicSubInfoMap.put(topic, groupSet);
+    public void addGroupSubTopicInfo(String groupName, Set<String> topicSet) {
+        for (String topic : topicSet) {
+            ConcurrentHashSet<String> groupSet = topicSubInfoMap.get(topic);
+            if (groupSet == null) {
+                ConcurrentHashSet<String> tmpGroupSet =
+                        new ConcurrentHashSet<>();
+                groupSet = topicSubInfoMap.putIfAbsent(topic, tmpGroupSet);
+                if (groupSet == null) {
+                    groupSet = tmpGroupSet;
+                }
+            }
+            groupSet.add(groupName);
+        }
     }
 
     /**
-     * Remove a group from the group set for a specific topic
+     * Remove the group's topic set
      *
-     * @param topic topic condition
-     * @param group group condition
-     * @return true if removed, false if not record
+     * @param group    the group name which needs removed topic
+     * @param topicSet the topic set which the group name subscribed
      */
-    public boolean removeTopicSubInfo(String topic,
-                                      String group) {
-        ConcurrentHashSet<String> groupSet = getTopicSubInfo(topic);
-        if (groupSet != null) {
-            return groupSet.remove(group);
+    public void rmvGroupSubTopicInfo(String group, Set<String> topicSet) {
+        if (topicSet == null || group == null) {
+            return;
+        }
+        ConcurrentHashSet<String> groupSet;
+        for (String topic : topicSet) {
+            if (topic == null) {
+                continue;
+            }
+            groupSet = topicSubInfoMap.get(topic);
+            if (groupSet == null) {
+                continue;
+            }
+            groupSet.remove(group);
         }
-        return true;
     }
 
     /**
@@ -85,30 +102,17 @@ public class TopicPSInfoManager {
      * @param topic  query topic
      * @return   target published producerId set
      */
-    public ConcurrentHashSet<String> getTopicPubInfo(String topic) {
+    public Set<String> getTopicPubInfo(String topic) {
         return topicPubInfoMap.get(topic);
     }
 
     /**
-     * Set producer IDs for a topic
-     *
-     * @param topic           topic target
-     * @param producerIdSet   topic produce source
-     * @return
-     */
-    public ConcurrentHashSet<String> setTopicPubInfo(String topic,
-                                                     ConcurrentHashSet<String> producerIdSet) {
-        return topicPubInfoMap.putIfAbsent(topic, producerIdSet);
-    }
-
-    /**
      * Add producer produce topic set
      *
      * @param producerId  need add producer id
      * @param topicList   need add topic set
      */
-    public void addProducerTopicPubInfo(final String producerId,
-                                        final Set<String> topicList) {
+    public void addProducerTopicPubInfo(String producerId, Set<String> topicList) {
         for (String topic : topicList) {
             ConcurrentHashSet<String> producerIdSet =
                     topicPubInfoMap.get(topic);
@@ -121,9 +125,7 @@ public class TopicPSInfoManager {
                     producerIdSet = tmpProducerIdSet;
                 }
             }
-            if (!producerIdSet.contains(producerId)) {
-                producerIdSet.add(producerId);
-            }
+            producerIdSet.add(producerId);
         }
     }
 
@@ -133,8 +135,7 @@ public class TopicPSInfoManager {
      * @param producerId  need removed producer id
      * @param topicList   need removed topic set
      */
-    public void rmvProducerTopicPubInfo(final String producerId,
-                                        final Set<String> topicList) {
+    public void rmvProducerTopicPubInfo(String producerId, Set<String> topicList) {
         if (topicList != null) {
             for (String topic : topicList) {
                 if (topic != null) {
@@ -169,7 +170,7 @@ public class TopicPSInfoManager {
         if (subTopicSet.isEmpty()) {
             // get all online group
             ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
-            List<String> onlineGroups = consumerHolder.getAllGroup();
+            List<String> onlineGroups = consumerHolder.getAllGroupName();
             if (!onlineGroups.isEmpty()) {
                 if (qryGroupSet.isEmpty()) {
                     resultSet.addAll(onlineGroups);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
new file mode 100644
index 0000000..0238109
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -0,0 +1,725 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeGroupInfo {
+    private static final Logger logger =
+            LoggerFactory.getLogger(ConsumeGroupInfo.class);
+    private final String groupName;
+    private final ConsumeType consumeType;
+    private final long createTime;                                        //create time
+    private final Set<String> topicSet = new HashSet<>();                 //topic set
+    private final Map<String, TreeSet<String>> topicConditions =          //filter condition set
+            new HashMap<>();
+    private final ReadWriteLock csmInfoRWLock = new ReentrantReadWriteLock();
+    private final Map<String, ConsumerInfo> consumerInfoMap =   //consumer info
+            new HashMap<>();
+    // session key, the same batch consumer have the same session key
+    private String sessionKey = "";
+    // session start time
+    private long sessionTime = TBaseConstants.META_VALUE_UNDEFINED;
+    // consumer count(specific by client)
+    private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
+    // select the bigger offset when offset conflict
+    private boolean isSelectedBig = true;
+    // allocate offset flag
+    private final AtomicBoolean notAllocate =
+            new AtomicBoolean(true);
+    // current check cycle
+    private final AtomicLong curCheckCycle = new AtomicLong(0);
+    // allocate times
+    private final AtomicInteger allocatedTimes = new AtomicInteger(0);
+    // partition info
+    private final ConcurrentHashMap<String, String> partitionInfoMap =
+            new ConcurrentHashMap<>();
+    // partition offset
+    private final ConcurrentHashMap<String, Long> partOffsetMap =
+            new ConcurrentHashMap<>();
+    // load balance
+    private final ConcurrentHashMap<String, NodeRebInfo> balanceNodeMap =
+            new ConcurrentHashMap<>();
+    // config broker/client ratio
+    private int confResourceRate = TBaseConstants.META_VALUE_UNDEFINED;
+    // current broker/client ratio
+    private int curResourceRate = TBaseConstants.META_VALUE_UNDEFINED;
+    // minimal client count according to above ratio
+    private int minReqClientCnt = TBaseConstants.META_VALUE_UNDEFINED;
+    // rebalance check status
+    private int balanceChkStatus = TBaseConstants.META_VALUE_UNDEFINED;
+    // log print flag
+    private boolean enableBalanceChkPrint = true;
+    //
+    private final AtomicLong csmCtrlId =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+    private final AtomicLong topicMetaInfoId =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+    private final ConcurrentHashMap<String, String> topicMetaInfoMap =
+            new ConcurrentHashMap<>();
+    private final AtomicLong lastMetaInfoFreshTime =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+
+    public ConsumeGroupInfo(ConsumerInfo consumer) {
+        this.groupName = consumer.getGroupName();
+        this.consumeType = consumer.getConsumeType();
+        this.topicSet.addAll(consumer.getTopicSet());
+        this.topicConditions.putAll(consumer.getTopicConditions());
+        this.createTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Add consumer to consume group
+     *
+     * @param inConsumer consumer object
+     */
+    public boolean addConsumer(ConsumerInfo inConsumer,
+                               StringBuilder sBuffer,
+                               ParamCheckResult result) {
+        try {
+            csmInfoRWLock.writeLock().lock();
+            if (this.consumerInfoMap.isEmpty()) {
+                if (this.consumeType == ConsumeType.CONSUME_BAND) {
+                    this.sessionKey = inConsumer.getSessionKey();
+                    this.sessionTime = inConsumer.getStartTime();
+                    this.sourceCount = inConsumer.getSourceCount();
+                    this.isSelectedBig = inConsumer.isSelectedBig();
+                    this.curCheckCycle.set(0);
+                } else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+                    this.sourceCount = inConsumer.getSourceCount();
+                }
+            } else {
+                if (!validConsumerInfo(inConsumer, sBuffer, result)) {
+                    return false;
+                }
+                ConsumerInfo curConsumerInfo =
+                        consumerInfoMap.get(inConsumer.getConsumerId());
+                if (curConsumerInfo != null) {
+                    curConsumerInfo.updCurConsumerInfo(inConsumer);
+                    result.setCheckData("Ok!");
+                    return true;
+                }
+            }
+            this.consumerInfoMap.put(inConsumer.getConsumerId(), inConsumer);
+            if (consumeType == ConsumeType.CONSUME_BAND) {
+                bookPartitionInfo(inConsumer);
+            }
+            result.setCheckData("Ok!");
+            return true;
+        } finally {
+            csmInfoRWLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Remove consumer
+     *
+     * @param consumerId consumer id
+     * @return  consumer object
+     */
+    public ConsumerInfo removeConsumer(String consumerId) {
+        if (consumerId == null) {
+            return null;
+        }
+        try {
+            csmInfoRWLock.writeLock().lock();
+            this.balanceNodeMap.remove(consumerId);
+            List<String> partKeyList = new ArrayList<>();
+            for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
+                if (entry.getValue() != null) {
+                    if (entry.getValue().equals(consumerId)) {
+                        partKeyList.add(entry.getKey());
+                    }
+                }
+            }
+            for (String partKey : partKeyList) {
+                partitionInfoMap.remove(partKey);
+                partOffsetMap.remove(partKey);
+            }
+            return this.consumerInfoMap.remove(consumerId);
+        } finally {
+            csmInfoRWLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Get consumer group count
+     *
+     * @return group count
+     */
+    public int getGroupCnt() {
+        try {
+            csmInfoRWLock.readLock().lock();
+            return this.consumerInfoMap.size();
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Add node balance info
+     *
+     * @param clientId        need add client id
+     * @param waitDuration    wait duration
+     */
+    public void addNodeRelInfo(String clientId, int waitDuration) {
+        NodeRebInfo nodeRebInfo = this.balanceNodeMap.get(clientId);
+        if (nodeRebInfo != null) {
+            if (nodeRebInfo.getStatus() == 4) {
+                this.balanceNodeMap.remove(clientId);
+            } else {
+                return;
+            }
+        }
+        try {
+            csmInfoRWLock.readLock().lock();
+            if (consumerInfoMap.containsKey(clientId)) {
+                this.balanceNodeMap.putIfAbsent(clientId,
+                        new NodeRebInfo(clientId, waitDuration));
+            }
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+    }
+
+    public RebProcessInfo getNeedBalanceNodes() {
+        List<String> needProcessList = new ArrayList<>();
+        List<String> needEscapeList = new ArrayList<>();
+        List<String> needRemoved = new ArrayList<>();
+        for (NodeRebInfo nodeRebInfo : this.balanceNodeMap.values()) {
+            if (nodeRebInfo.getStatus() == 0) {
+                nodeRebInfo.setStatus(1);
+                needProcessList.add(nodeRebInfo.getClientId());
+            } else {
+                if (nodeRebInfo.getReqType() == 1
+                        && nodeRebInfo.getStatus() == 2) {
+                    if (nodeRebInfo.decrAndGetWaitDuration() <= 0) {
+                        nodeRebInfo.setStatus(4);
+                        needRemoved.add(nodeRebInfo.getClientId());
+                    } else {
+                        needEscapeList.add(nodeRebInfo.getClientId());
+                    }
+                }
+            }
+        }
+        for (String clientId : needRemoved) {
+            this.balanceNodeMap.remove(clientId);
+        }
+        return new RebProcessInfo(needProcessList, needEscapeList);
+    }
+
+    public void setBalanceNodeProcessed(List<String> processList) {
+        if (processList == null
+                || processList.isEmpty()
+                || this.balanceNodeMap.isEmpty()) {
+            return;
+        }
+        List<String> needRemoved = new ArrayList<>();
+        for (NodeRebInfo nodeRebInfo : this.balanceNodeMap.values()) {
+            if (processList.contains(nodeRebInfo.getClientId())) {
+                if (nodeRebInfo.getReqType() == 0) {
+                    nodeRebInfo.setStatus(4);
+                    needRemoved.add(nodeRebInfo.getClientId());
+                } else {
+                    nodeRebInfo.setStatus(2);
+                }
+            }
+        }
+        for (String clientId : needRemoved) {
+            this.balanceNodeMap.remove(clientId);
+        }
+    }
+
+    public boolean isUnReadyServerBalance() {
+        return (consumeType == ConsumeType.CONSUME_BAND
+                && notAllocate.get()
+                && !partitionInfoMap.isEmpty()
+                && allocatedTimes.get() < 2);
+    }
+
+    public boolean isNotNeedBoundBalance() {
+        return (consumeType != ConsumeType.CONSUME_BAND
+                || !notAllocate.get()
+                || partitionInfoMap.isEmpty()
+                || allocatedTimes.get() >= 2);
+    }
+
+    public boolean needResourceCheck() {
+        return (consumeType == ConsumeType.CONSUME_NORMAL && balanceChkStatus <= 0);
+    }
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+    public boolean isClientBalance() {
+        return consumeType == ConsumeType.CONSUME_CLIENT_REB;
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public String getSessionKey() {
+        return sessionKey;
+    }
+
+    public boolean isNotAllocate() {
+        return notAllocate.get();
+    }
+
+    public void settAllocated() {
+        this.notAllocate.compareAndSet(true, false);
+    }
+
+    public int getAllocatedTimes() {
+        return allocatedTimes.get();
+    }
+
+    public void addAllocatedTimes() {
+        this.allocatedTimes.incrementAndGet();
+    }
+
+    public boolean isGroupEmpty() {
+        try {
+            csmInfoRWLock.readLock().lock();
+            return consumerInfoMap.isEmpty();
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+    }
+
+    public boolean isGroupFullSize() {
+        try {
+            csmInfoRWLock.readLock().lock();
+            return this.consumerInfoMap.size() >= this.sourceCount;
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+    }
+
+    public boolean isBalanceMapEmpty() {
+        return this.balanceNodeMap.isEmpty();
+    }
+
+    public long getSessionTime() {
+        return sessionTime;
+    }
+
+    public int getSourceCount() {
+        return sourceCount;
+    }
+
+    public long getCreateTime() {
+        return createTime;
+    }
+
+    public long getCurCheckCycle() {
+        return curCheckCycle.get();
+    }
+
+    public long addCurCheckCycle() {
+        return curCheckCycle.addAndGet(1);
+    }
+
+    /**
+     * Set current consumer  broker/client ratio
+     *
+     * @param confResourceRate configured broker/client ratio
+     * @param curResourceRate  current broker/client ratio
+     * @param minReqClientCnt  minimal client count
+     * @param isRebalanced     Whether balanced
+     */
+    public void setConsumeResourceInfo(int confResourceRate,
+                                       int curResourceRate,
+                                       int minReqClientCnt,
+                                       boolean isRebalanced) {
+        this.confResourceRate = confResourceRate;
+        this.curResourceRate = curResourceRate;
+        this.minReqClientCnt = minReqClientCnt;
+        this.enableBalanceChkPrint = false;
+        if (isRebalanced) {
+            this.balanceChkStatus = 1;
+        } else {
+            this.balanceChkStatus = 0;
+        }
+    }
+
+    public boolean isEnableBalanceChkPrint() {
+        return enableBalanceChkPrint;
+    }
+
+    public int getConfResourceRate() {
+        return confResourceRate;
+    }
+
+    public int getCurResourceRate() {
+        return curResourceRate;
+    }
+
+    public int getMinReqClientCnt() {
+        return minReqClientCnt;
+    }
+
+    public int getBalanceChkStatus() {
+        return balanceChkStatus;
+    }
+
+    public Set<String> getTopicSet() {
+        return this.topicSet;
+    }
+
+    public boolean isSelectedBig() {
+        return isSelectedBig;
+    }
+
+    public Map<String, TreeSet<String>> getTopicConditions() {
+        return this.topicConditions;
+    }
+
+    public List<ConsumerInfo> getConsumerInfoList() {
+        try {
+            csmInfoRWLock.readLock().lock();
+            return new ArrayList<>(this.consumerInfoMap.values());
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+    }
+
+    public List<String> getConsumerIdList() {
+        try {
+            csmInfoRWLock.readLock().lock();
+            return new ArrayList<>(this.consumerInfoMap.keySet());
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+    }
+
+    public ConsumerInfo getConsumerInfo(String consumerId) {
+        try {
+            csmInfoRWLock.readLock().lock();
+            return consumerInfoMap.get(consumerId);
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+    }
+
+    public List<String> getConsumerViewInfos() {
+        List<String> result = new ArrayList<>();
+        try {
+            csmInfoRWLock.readLock().lock();
+            for (ConsumerInfo consumerInfo : this.consumerInfoMap.values()) {
+                if (consumerInfo == null) {
+                    continue;
+                }
+                result.add(consumerInfo.getConsumerViewInfo());
+            }
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    public List<Tuple2<String, Boolean>> getConsumerIdAndTlsInfos() {
+        List<Tuple2<String, Boolean>> result = new ArrayList<>();
+        try {
+            csmInfoRWLock.readLock().lock();
+            for (ConsumerInfo consumerInfo : this.consumerInfoMap.values()) {
+                if (consumerInfo == null) {
+                    continue;
+                }
+                result.add(consumerInfo.getConsumerIdAndTlsInfoTuple());
+            }
+        } finally {
+            csmInfoRWLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    public Map<String, String> getPartitionInfoMap() {
+        return this.partitionInfoMap;
+    }
+
+    public Map<String, Long> getPartOffsetMap() {
+        return this.partOffsetMap;
+    }
+
+    public Map<String, NodeRebInfo> getBalanceMap() {
+        return this.balanceNodeMap;
+    }
+
+    /**
+     * book bound report partition information
+     *
+     * @param consumer consumer info
+     */
+    private void bookPartitionInfo(ConsumerInfo consumer) {
+        if (consumeType != ConsumeType.CONSUME_BAND) {
+            return;
+        }
+        Map<String, Long> consumerPartMap = consumer.getRequiredPartition();
+        if (consumerPartMap == null || consumerPartMap.isEmpty()) {
+            return;
+        }
+        for (Map.Entry<String, Long> entry : consumerPartMap.entrySet()) {
+            String oldClientId = this.partitionInfoMap.get(entry.getKey());
+            if (oldClientId == null) {
+                this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+                this.partOffsetMap.put(entry.getKey(), entry.getValue());
+            } else {
+                ConsumerInfo oldConsumerInfo = this.consumerInfoMap.get(oldClientId);
+                if (oldConsumerInfo == null) {
+                    this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+                    this.partOffsetMap.put(entry.getKey(), entry.getValue());
+                } else {
+                    Map<String, Long> oldConsumerPartMap = oldConsumerInfo.getRequiredPartition();
+                    if (oldConsumerPartMap == null || oldConsumerPartMap.isEmpty()) {
+                        this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+                        this.partOffsetMap.put(entry.getKey(), entry.getValue());
+                    } else {
+                        Long oldConsumerOff = oldConsumerPartMap.get(entry.getKey());
+                        if (oldConsumerOff == null) {
+                            this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+                            this.partOffsetMap.put(entry.getKey(), entry.getValue());
+                        } else {
+                            if (this.isSelectedBig) {
+                                if (entry.getValue() >= oldConsumerOff) {
+                                    this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+                                    this.partOffsetMap.put(entry.getKey(), entry.getValue());
+                                }
+                            } else {
+                                if (entry.getValue() < oldConsumerOff) {
+                                    this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+                                    this.partOffsetMap.put(entry.getKey(), entry.getValue());
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Check the validity of consumer's parameters
+     *
+     * @param inConsumer consumer info
+     * @param sBuffer string buffer
+     * @param result process result
+     * @return true if valid, or false if invalid
+     */
+    private boolean validConsumerInfo(ConsumerInfo inConsumer,
+                                      StringBuilder sBuffer,
+                                      ParamCheckResult result) {
+        // check whether the consumer behavior is consistent
+        if (inConsumer.getConsumeType() != this.consumeType) {
+            sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                    .append(" using ").append(inConsumer.getConsumeType().getName())
+                    .append(" subscribe is inconsistency with other consumers using ")
+                    .append(this.consumeType.getName())
+                    .append(" subscribe in the group");
+            result.setCheckResult(false,
+                    TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, sBuffer.toString());
+            logger.warn(sBuffer.toString());
+            sBuffer.delete(0, sBuffer.length());
+            return false;
+        }
+        // check the topics of consumption
+        if (CollectionUtils.isNotEmpty(topicSet)
+                && (topicSet.size() != inConsumer.getTopicSet().size()
+                || !topicSet.containsAll(inConsumer.getTopicSet()))) {
+            sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                    .append(" subscribed topics ").append(inConsumer.getTopicSet())
+                    .append(" is inconsistency with other consumers in the group, existedTopics: ")
+                    .append(topicSet);
+            result.setCheckResult(false,
+                    TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, sBuffer.toString());
+            logger.warn(sBuffer.toString());
+            sBuffer.delete(0, sBuffer.length());
+            return false;
+        }
+        // check the topic conditions of consumption
+        boolean isCondEqual = true;
+        if (topicConditions == null || topicConditions.isEmpty()) {
+            if (!inConsumer.getTopicConditions().isEmpty()) {
+                isCondEqual = false;
+                sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                        .append(" subscribe with filter condition ")
+                        .append(inConsumer.getTopicConditions())
+                        .append(" is inconsistency with other consumers in the group: topic without conditions");
+            }
+        } else {
+            // check the filter conditions of the topic
+            if (inConsumer.getTopicConditions().isEmpty()) {
+                isCondEqual = false;
+                sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                        .append(" subscribe without filter condition ")
+                        .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
+                        .append(topicConditions);
+            } else {
+                Set<String> existedCondTopics = topicConditions.keySet();
+                Set<String> reqCondTopics = inConsumer.getTopicConditions().keySet();
+                if (existedCondTopics.size() != reqCondTopics.size()
+                        || !existedCondTopics.containsAll(reqCondTopics)) {
+                    isCondEqual = false;
+                    sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                            .append(" subscribe with filter condition ")
+                            .append(inConsumer.getTopicConditions())
+                            .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
+                            .append(topicConditions);
+                } else {
+                    for (String topicKey : existedCondTopics) {
+                        if ((topicConditions.get(topicKey).size()
+                                != inConsumer.getTopicConditions().get(topicKey).size())
+                                || (!topicConditions.get(topicKey).containsAll(
+                                inConsumer.getTopicConditions().get(topicKey)))) {
+                            isCondEqual = false;
+                            sBuffer.append("[Inconsistency subscribe] ")
+                                    .append(inConsumer.getConsumerId())
+                                    .append(" subscribe with filter condition ")
+                                    .append(inConsumer.getTopicConditions())
+                                    .append(" is inconsistency with other consumers in the group,")
+                                    .append(" existed topic conditions is ")
+                                    .append(topicConditions);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        if (!isCondEqual) {
+            result.setCheckResult(false,
+                    TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, sBuffer.toString());
+            logger.warn(sBuffer.toString());
+            return false;
+        }
+        // Check the validity of bound consumer's parameters
+        if (this.consumeType == ConsumeType.CONSUME_BAND) {
+            if (!validBoundParameters(inConsumer, sBuffer, result)) {
+                return false;
+            }
+        } else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+            if (this.sourceCount > 0) {
+                if (this.sourceCount != inConsumer.getSourceCount()) {
+                    sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                            .append("'s sourceCount is inconsistency with other consumers in the group, required is ")
+                            .append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
+                    result.setCheckResult(false,
+                            TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString());
+                    logger.warn(sBuffer.toString());
+                    return false;
+                }
+                boolean foundEqual = false;
+                String occupiedConsumerId = null;
+                for (ConsumerInfo consumerInfo : consumerInfoMap.values()) {
+                    if (consumerInfo == null) {
+                        continue;
+                    }
+                    if (consumerInfo.getNodeId() == inConsumer.getNodeId()
+                            && !consumerInfo.getConsumerId().equals(inConsumer.getConsumerId())) {
+                        foundEqual = true;
+                        occupiedConsumerId = consumerInfo.getConsumerId();
+                        break;
+                    }
+                }
+                if (foundEqual) {
+                    sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                            .append("'s nodeId value(").append(inConsumer.getNodeId())
+                            .append(") is occupied by ").append(occupiedConsumerId)
+                            .append(" in the group!");
+                    result.setCheckResult(false,
+                            TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, sBuffer.toString());
+                    logger.warn(sBuffer.toString());
+                    return false;
+                }
+            }
+        }
+        result.setCheckData("Ok");
+        return true;
+    }
+
+    /**
+     * Check the validity of bound consumer's parameters
+     *
+     * @param inConsumer consumer info
+     * @param sBuffer string buffer
+     * @param result process result
+     * @return true if valid, or false if invalid
+     */
+    private boolean validBoundParameters(ConsumerInfo inConsumer,
+                                         StringBuilder sBuffer,
+                                         ParamCheckResult result) {
+        if (consumeType != ConsumeType.CONSUME_BAND) {
+            result.setCheckData("");
+            return true;
+        }
+        // If the sessionKey is inconsistent, it means that the previous round of consumption has not completely
+        // exited. In order to avoid the incomplete offset setting, it is necessary to completely clear the above
+        // data before resetting and consuming this round of consumption
+        if (!sessionKey.equals(inConsumer.getSessionKey())) {
+            sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                    .append("'s sessionKey is inconsistency with other consumers in the group, required is ")
+                    .append(sessionKey).append(", request is ").append(inConsumer.getSessionKey());
+            result.setCheckResult(false,
+                    TErrCodeConstants.CLIENT_INCONSISTENT_SESSIONKEY, sBuffer.toString());
+            logger.warn(sBuffer.toString());
+            return false;
+        }
+        // check the offset config
+        if (isSelectedBig != inConsumer.isSelectedBig()) {
+            sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                    .append("'s isSelectBig is inconsistency with other consumers in the group, required is ")
+                    .append(isSelectedBig).append(", request is ").append(inConsumer.isSelectedBig());
+            result.setCheckResult(false,
+                    TErrCodeConstants.CLIENT_INCONSISTENT_SELECTBIG, sBuffer.toString());
+            logger.warn(sBuffer.toString());
+            return false;
+        }
+        // check the consumers count
+        if (sourceCount != inConsumer.getSourceCount()) {
+            sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+                    .append("'s sourceCount is inconsistency with other consumers in the group, required is ")
+                    .append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
+            result.setCheckResult(false,
+                    TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString());
+            logger.warn(sBuffer.toString());
+            return false;
+        }
+        result.setCheckData("Ok");
+        return true;
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeType.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeType.java
new file mode 100644
index 0000000..b390d41
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeType.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+public enum ConsumeType {
+    CONSUME_NORMAL(0, "unbound", "Normal consume without reset offset"),
+    CONSUME_BAND(1, "bound", "Consume data with reset offset"),
+    CONSUME_CLIENT_REB(2, "client-rebalance", "Consume data with client assigned partitions");
+
+    private final int code;
+    private final String name;
+    private final String description;
+
+    ConsumeType(int code, String name, String description) {
+        this.code = code;
+        this.name = name;
+        this.description = description;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public static ConsumeType valueOf(int code) {
+        for (ConsumeType csmType : ConsumeType.values()) {
+            if (csmType.getCode() == code) {
+                return csmType;
+            }
+        }
+        throw new IllegalArgumentException(String.format("unknown ConsumeType code %s", code));
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
deleted file mode 100644
index b8cdda6..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
-
-public class ConsumerBandInfo {
-
-    private boolean isBandConsume = false;
-
-    //session key, the same batch consumer have the same session key
-    private String sessionKey = "";
-    private long sessionTime = -1;  //session start time
-    private int sourceCount = 0;    //consumer count(specific by client)
-    private boolean isSelectedBig = true;   //select the bigger offset when offset conflict
-    private AtomicBoolean notAllocate = new AtomicBoolean(true); //allocate offset flag
-    private AtomicLong curCheckCycle = new AtomicLong(0);       //current check cycle
-    private AtomicInteger allocatedTimes = new AtomicInteger(0);    //allocate times
-    private long createTime = System.currentTimeMillis();           //create time
-    private Set<String> topicSet = new HashSet<>();                 //topic set
-    private Map<String, TreeSet<String>> topicConditions =          //filter condition set
-            new HashMap<>();
-    private ConcurrentHashMap<String, ConsumerInfo> consumerInfoMap =   //consumer info
-            new ConcurrentHashMap<>();
-    private ConcurrentHashMap<String, String> partitionInfoMap =        //partition info
-            new ConcurrentHashMap<>();
-    private ConcurrentHashMap<String, Long> partOffsetMap =             //partition offset
-            new ConcurrentHashMap<>();
-    private ConcurrentHashMap<String, NodeRebInfo> rebalanceMap =       //load balance
-            new ConcurrentHashMap<>();
-    private int defBClientRate = -2;            //default broker/client ratio
-    private int confBClientRate = -2;           //config broker/client ratio
-    private int curBClientRate = -2;            //current broker/client ratio
-    private int minRequireClientCnt = -2;       //minimal client count according to above ratio
-    private int rebalanceCheckStatus = -2;      //rebalance check status
-    private boolean rebalanceCheckPrint = true;   //log print flag
-
-    public ConsumerBandInfo(boolean isSelectedBig) {
-        this.sessionKey = "";
-        this.sessionTime = -1;
-        this.sourceCount = -1;
-        this.curCheckCycle.set(0);
-        this.allocatedTimes.set(0);
-        this.notAllocate.set(true);
-        this.isBandConsume = false;
-        this.isSelectedBig = isSelectedBig;
-    }
-
-    public ConsumerBandInfo(boolean isBandConsume, String sessionKey,
-                            long sessionTime, int sourceCount, long createTime,
-                            long curCheckCycle, boolean notAllocate, int allocatedTimes,
-                            boolean isSelectedBig, Set<String> topicSet,
-                            Map<String, TreeSet<String>> topicConditions,
-                            Map<String, ConsumerInfo> consumerInfoMap,
-                            Map<String, String> partitionInfoMap,
-                            Map<String, Long> partOffsetMap,
-                            Map<String, NodeRebInfo> rebalanceMap,
-                            int defBClientRate, int confBClientRate,
-                            int curBClientRate, int minRequireClientCnt,
-                            int rebalanceCheckStatus, boolean rebalanceCheckPrint) {
-        this.isBandConsume = isBandConsume;
-        this.sessionKey = sessionKey;
-        this.sessionTime = sessionTime;
-        this.sourceCount = sourceCount;
-        this.createTime = createTime;
-        this.isSelectedBig = isSelectedBig;
-        this.notAllocate.set(notAllocate);
-        this.allocatedTimes.set(allocatedTimes);
-        this.curCheckCycle.set(curCheckCycle);
-        this.defBClientRate = defBClientRate;
-        this.confBClientRate = confBClientRate;
-        this.curBClientRate = curBClientRate;
-        this.minRequireClientCnt = minRequireClientCnt;
-        this.rebalanceCheckStatus = rebalanceCheckStatus;
-        this.rebalanceCheckPrint = rebalanceCheckPrint;
-        this.topicSet.addAll(topicSet);
-        for (Map.Entry<String, ConsumerInfo> entry : consumerInfoMap.entrySet()) {
-            this.consumerInfoMap.put(entry.getKey(), entry.getValue().clone());
-        }
-        for (Map.Entry<String, TreeSet<String>> entry : topicConditions.entrySet()) {
-            this.topicConditions.put(entry.getKey(), entry.getValue());
-        }
-        for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
-            if (entry.getValue() != null) {
-                this.partitionInfoMap.put(entry.getKey(), entry.getValue());
-            }
-        }
-        for (Map.Entry<String, Long> entry : partOffsetMap.entrySet()) {
-            if (entry.getValue() != null) {
-                this.partOffsetMap.put(entry.getKey(), entry.getValue());
-            }
-        }
-        if (rebalanceMap != null) {
-            for (Map.Entry<String, NodeRebInfo> entry : rebalanceMap.entrySet()) {
-                if (entry.getValue() != null) {
-                    this.rebalanceMap.put(entry.getKey(), entry.getValue().clone());
-                }
-            }
-        }
-    }
-
-    /**
-     * Add consumer
-     *
-     * @param consumer
-     */
-    public void addConsumer(ConsumerInfo consumer) {
-        if (this.consumerInfoMap.isEmpty()) {
-            this.isBandConsume = consumer.isRequireBound();
-            if (this.isBandConsume) {
-                this.sessionKey = consumer.getSessionKey();
-                this.sessionTime = consumer.getStartTime();
-                this.sourceCount = consumer.getSourceCount();
-                this.createTime = System.currentTimeMillis();
-                this.curCheckCycle.set(0);
-            }
-            this.topicSet.addAll(consumer.getTopicSet());
-            for (Map.Entry<String, TreeSet<String>> entry
-                    : consumer.getTopicConditions().entrySet()) {
-                this.topicConditions.put(entry.getKey(), entry.getValue());
-            }
-        }
-        this.consumerInfoMap.put(consumer.getConsumerId(), consumer);
-        Map<String, Long> consumerPartMap = consumer.getRequiredPartition();
-        if (!this.isBandConsume
-                || consumerPartMap == null
-                || consumerPartMap.isEmpty()) {
-            return;
-        }
-        for (Map.Entry<String, Long> entry : consumerPartMap.entrySet()) {
-            String oldClientId = this.partitionInfoMap.get(entry.getKey());
-            if (oldClientId == null) {
-                this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
-                this.partOffsetMap.put(entry.getKey(), entry.getValue());
-            } else {
-                ConsumerInfo oldConsumerInfo = this.consumerInfoMap.get(oldClientId);
-                if (oldConsumerInfo == null) {
-                    this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
-                    this.partOffsetMap.put(entry.getKey(), entry.getValue());
-                } else {
-                    Map<String, Long> oldConsumerPartMap = oldConsumerInfo.getRequiredPartition();
-                    if (oldConsumerPartMap == null || oldConsumerPartMap.isEmpty()) {
-                        this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
-                        this.partOffsetMap.put(entry.getKey(), entry.getValue());
-                    } else {
-                        Long oldConsumerOff = oldConsumerPartMap.get(entry.getKey());
-                        if (oldConsumerOff == null) {
-                            this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
-                            this.partOffsetMap.put(entry.getKey(), entry.getValue());
-                        } else {
-                            if (this.isSelectedBig) {
-                                if (entry.getValue() >= oldConsumerOff) {
-                                    this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
-                                    this.partOffsetMap.put(entry.getKey(), entry.getValue());
-                                }
-                            } else {
-                                if (entry.getValue() < oldConsumerOff) {
-                                    this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
-                                    this.partOffsetMap.put(entry.getKey(), entry.getValue());
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-    }
-
-    /**
-     * Remove consumer
-     *
-     * @param consumerId
-     * @return
-     */
-    public ConsumerInfo removeConsumer(String consumerId) {
-        if (consumerId == null) {
-            return null;
-        }
-        this.rebalanceMap.remove(consumerId);
-        List<String> partKeyList = new ArrayList<>();
-        for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
-            if (entry.getValue() != null) {
-                if (entry.getValue().equals(consumerId)) {
-                    partKeyList.add(entry.getKey());
-                }
-            }
-        }
-        for (String partKey : partKeyList) {
-            partitionInfoMap.remove(partKey);
-            partOffsetMap.remove(partKey);
-        }
-        return this.consumerInfoMap.remove(consumerId);
-    }
-
-    public void clear() {
-        this.isBandConsume = false;
-        this.notAllocate.set(true);
-        this.sessionKey = "";
-        this.sessionTime = -1;
-        this.sourceCount = -1;
-        this.createTime = -1;
-        this.curCheckCycle.set(0);
-        this.allocatedTimes.set(0);
-        this.topicSet.clear();
-        this.consumerInfoMap.clear();
-        this.topicConditions.clear();
-        this.partitionInfoMap.clear();
-        this.partOffsetMap.clear();
-        this.rebalanceMap.clear();
-    }
-
-    /**
-     * Get consumer group count
-     *
-     * @return group count
-     */
-    public int getGroupCnt() {
-        return this.consumerInfoMap.size();
-    }
-
-    /**
-     * Add node rebalance info
-     *
-     * @param clientId
-     * @param waitDuration
-     * @return
-     */
-    public NodeRebInfo addNodeRelInfo(String clientId, int waitDuration) {
-        NodeRebInfo nodeRebInfo = this.rebalanceMap.get(clientId);
-        if (nodeRebInfo != null) {
-            if (nodeRebInfo.getStatus() == 4) {
-                this.rebalanceMap.remove(clientId);
-                nodeRebInfo = null;
-            } else {
-                return nodeRebInfo;
-            }
-        }
-        if (consumerInfoMap.containsKey(clientId)) {
-            NodeRebInfo tmpNodeInfo = new NodeRebInfo(clientId, waitDuration);
-            nodeRebInfo = this.rebalanceMap.putIfAbsent(clientId, tmpNodeInfo);
-            if (nodeRebInfo == null) {
-                nodeRebInfo = tmpNodeInfo;
-            }
-        }
-        return nodeRebInfo;
-    }
-
-    public RebProcessInfo getNeedRebNodeList() {
-        List<String> needProcessList = new ArrayList<>();
-        List<String> needEscapeList = new ArrayList<>();
-        List<String> needRemoved = new ArrayList<>();
-        for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
-            if (nodeRebInfo.getStatus() == 0) {
-                nodeRebInfo.setStatus(1);
-                needProcessList.add(nodeRebInfo.getClientId());
-            } else {
-                if (nodeRebInfo.getReqType() == 1
-                        && nodeRebInfo.getStatus() == 2) {
-                    if (nodeRebInfo.decrAndGetWaitDuration() <= 0) {
-                        nodeRebInfo.setStatus(4);
-                        needRemoved.add(nodeRebInfo.getClientId());
-                    } else {
-                        needEscapeList.add(nodeRebInfo.getClientId());
-                    }
-                }
-            }
-        }
-        for (String clientId : needRemoved) {
-            this.rebalanceMap.remove(clientId);
-        }
-        return new RebProcessInfo(needProcessList, needEscapeList);
-    }
-
-    public void setRebNodeProcessed(List<String> processList) {
-        if (processList == null
-                || processList.isEmpty()
-                || this.rebalanceMap.isEmpty()) {
-            return;
-        }
-        List<String> needRemoved = new ArrayList<>();
-        for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
-            if (processList.contains(nodeRebInfo.getClientId())) {
-                if (nodeRebInfo.getReqType() == 0) {
-                    nodeRebInfo.setStatus(4);
-                    needRemoved.add(nodeRebInfo.getClientId());
-                } else {
-                    nodeRebInfo.setStatus(2);
-                }
-            }
-        }
-        for (String clientId : needRemoved) {
-            this.rebalanceMap.remove(clientId);
-        }
-        return;
-    }
-
-    public boolean isBandConsume() {
-        return isBandConsume;
-    }
-
-    public String getSessionKey() {
-        return sessionKey;
-    }
-
-    public boolean isNotAllocate() {
-        return notAllocate.get();
-    }
-
-    public int getAllocatedTimes() {
-        return allocatedTimes.get();
-    }
-
-    public void addAllocatedTimes() {
-        this.allocatedTimes.incrementAndGet();
-    }
-
-    public boolean isGroupFullSize() {
-        return this.consumerInfoMap.size() >= this.sourceCount;
-    }
-
-    public boolean isRebalanceMapEmpty() {
-        return (this.rebalanceMap == null
-                || this.rebalanceMap.isEmpty());
-    }
-
-    public boolean isBandPartsEmpty() {
-        return this.partitionInfoMap.isEmpty();
-    }
-
-    public void settAllocated() {
-        this.notAllocate.compareAndSet(true, false);
-    }
-
-    public long getSessionTime() {
-        return sessionTime;
-    }
-
-    public int getSourceCount() {
-        return sourceCount;
-    }
-
-    public long getCreateTime() {
-        return createTime;
-    }
-
-    public long getCurCheckCycle() {
-        return curCheckCycle.get();
-    }
-
-    public long addCurCheckCycle() {
-        return curCheckCycle.addAndGet(1);
-    }
-
-    /**
-     * Set current consumer  broker/client ratio
-     *
-     * @param defBClientRate
-     * @param confBClientRate
-     * @param curBClientRate
-     * @param minRequireClientCnt
-     * @param isRebalanced
-     */
-    public void setCurrConsumeBClientInfo(int defBClientRate,
-                                          int confBClientRate,
-                                          int curBClientRate,
-                                          int minRequireClientCnt,
-                                          boolean isRebalanced) {
-        this.defBClientRate = defBClientRate;
-        this.confBClientRate = confBClientRate;
-        this.curBClientRate = curBClientRate;
-        this.minRequireClientCnt = minRequireClientCnt;
-        this.rebalanceCheckPrint = false;
-        if (isRebalanced) {
-            this.rebalanceCheckStatus = 1;
-        } else {
-            this.rebalanceCheckStatus = 0;
-        }
-    }
-
-    public boolean isRebalanceCheckPrint() {
-        return rebalanceCheckPrint;
-    }
-
-    public AtomicBoolean getNotAllocate() {
-        return notAllocate;
-    }
-
-    public int getDefBClientRate() {
-        return defBClientRate;
-    }
-
-    public int getConfBClientRate() {
-        return confBClientRate;
-    }
-
-    public int getCurBClientRate() {
-        return curBClientRate;
-    }
-
-    public int getMinRequireClientCnt() {
-        return minRequireClientCnt;
-    }
-
-    public int getRebalanceCheckStatus() {
-        return rebalanceCheckStatus;
-    }
-
-    public Set<String> getTopicSet() {
-        return this.topicSet;
-    }
-
-    public boolean isSelectedBig() {
-        return isSelectedBig;
-    }
-
-    public Map<String, TreeSet<String>> getTopicConditions() {
-        return this.topicConditions;
-    }
-
-    public List<ConsumerInfo> getConsumerInfoList() {
-        List<ConsumerInfo> result = new ArrayList<>();
-        result.addAll(this.consumerInfoMap.values());
-        return result;
-    }
-
-    public List<ConsumerInfo> cloneConsumerInfoList() {
-        List<ConsumerInfo> result = new ArrayList<>();
-        for (ConsumerInfo consumer : this.consumerInfoMap.values()) {
-            if (consumer != null) {
-                result.add(consumer.clone());
-            }
-        }
-        return result;
-    }
-
-    public ConsumerInfo getConsumerInfo(String consumerId) {
-        return consumerInfoMap.get(consumerId);
-    }
-
-    public Map<String, String> getPartitionInfoMap() {
-        return this.partitionInfoMap;
-    }
-
-    public Map<String, Long> getPartOffsetMap() {
-        return this.partOffsetMap;
-    }
-
-    public Map<String, NodeRebInfo> getRebalanceMap() {
-        return this.rebalanceMap;
-    }
-
-    @Override
-    public ConsumerBandInfo clone() {
-        // no need to deep clone
-        return new ConsumerBandInfo(this.isBandConsume, this.sessionKey,
-                this.sessionTime, this.sourceCount,
-                this.createTime, this.curCheckCycle.get(),
-                this.notAllocate.get(), this.allocatedTimes.get(),
-                this.isSelectedBig,
-                this.topicSet, this.topicConditions,
-                this.consumerInfoMap, this.partitionInfoMap,
-                this.partOffsetMap, this.rebalanceMap,
-                this.defBClientRate, this.confBClientRate,
-                this.curBClientRate, this.minRequireClientCnt,
-                this.rebalanceCheckStatus, this.rebalanceCheckPrint);
-    }
-
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index 9622c91..e3b0046 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +89,7 @@ public class ConsumerEventManager {
      */
     public ConsumerEvent peek(String consumerId) {
         String group =
-                consumerHolder.getGroup(consumerId);
+                consumerHolder.getGroupName(consumerId);
         if (group != null) {
             ConcurrentHashMap<String, LinkedList<ConsumerEvent>> currentEventMap =
                     hasDisconnectEvent(group)
@@ -119,7 +118,7 @@ public class ConsumerEventManager {
      */
     public ConsumerEvent removeFirst(String consumerId) {
         ConsumerEvent event = null;
-        String group = consumerHolder.getGroup(consumerId);
+        String group = consumerHolder.getGroupName(consumerId);
         ConcurrentHashMap<String, LinkedList<ConsumerEvent>> currentEventMap =
                 hasDisconnectEvent(group) ? disconnectEventMap : connectEventMap;
         LinkedList<ConsumerEvent> eventList = currentEventMap.get(consumerId);
@@ -211,16 +210,19 @@ public class ConsumerEventManager {
     /**
      * Check if disconnect event map have event
      *
-     * @param group
+     * @param group    the group name needs a query
      * @return true if disconnect event map not empty otherwise false
      */
     public boolean hasDisconnectEvent(String group) {
-        List<ConsumerInfo> consumerList =
-                consumerHolder.getConsumerList(group);
-        if (CollectionUtils.isNotEmpty(consumerList)) {
-            for (ConsumerInfo consumer : consumerList) {
+        List<String> consumerIdList =
+                consumerHolder.getConsumerIdList(group);
+        if (CollectionUtils.isNotEmpty(consumerIdList)) {
+            for (String consumerId : consumerIdList) {
+                if (consumerId == null) {
+                    continue;
+                }
                 List<ConsumerEvent> eventList =
-                        disconnectEventMap.get(consumer.getConsumerId());
+                        disconnectEventMap.get(consumerId);
                 if (eventList != null) {
                     synchronized (eventList) {
                         if (CollectionUtils.isNotEmpty(eventList)) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
new file mode 100644
index 0000000..4fbcedf
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.common.utils.ClientSyncInfo;
+
+public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
+
+    private static final long serialVersionUID = 3095734962491009712L;
+
+    private final String consumerId;
+    private final String consumerViewInfo;
+    private final ConsumeType consumeType;
+    private final String group;
+    private final Set<String> topicSet;
+    private final Map<String, TreeSet<String>> topicConditions;
+    private boolean overTLS = false;
+    private long startTime = TBaseConstants.META_VALUE_UNDEFINED;
+    private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
+    private int nodeId = TBaseConstants.META_VALUE_UNDEFINED;
+    // for band consume type node
+    private String sessionKey = "";
+    //select the bigger offset when offset conflict
+    private boolean selectedBig = true;
+    private Map<String, Long> requiredPartition;
+    // for client balance node
+    private long csmFromMaxOffsetCtrlId = TBaseConstants.META_VALUE_UNDEFINED;
+    private long lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
+    private long usedTopicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
+
+    public ConsumerInfo(String consumerId, boolean overTLS, String group,
+                        Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
+                        ConsumeType consumeType, String sessionKey, long startTime,
+                        int sourceCount, boolean selectedBig,
+                        Map<String, Long> requiredPartition) {
+        this.group = group;
+        this.consumeType = consumeType;
+        this.consumerId = consumerId;
+        this.overTLS = overTLS;
+        this.topicSet = topicSet;
+        if (topicConditions == null) {
+            this.topicConditions = new HashMap<>();
+        } else {
+            this.topicConditions = topicConditions;
+        }
+        this.sessionKey = sessionKey;
+        this.selectedBig = selectedBig;
+        this.startTime = startTime;
+        this.sourceCount = sourceCount;
+        this.requiredPartition = requiredPartition;
+        this.consumerViewInfo = toString();
+    }
+
+    public ConsumerInfo(String consumerId, boolean overTLS, String group,
+                        ConsumeType consumeType, int sourceCount, int nodeId,
+                        Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
+                        long curCsmCtrlId, ClientSyncInfo syncInfo) {
+        this.group = group;
+        this.consumeType = consumeType;
+        this.consumerId = consumerId;
+        this.overTLS = overTLS;
+        this.topicSet = topicSet;
+        if (topicConditions == null) {
+            this.topicConditions = new HashMap<>();
+        } else {
+            this.topicConditions = topicConditions;
+        }
+        this.sourceCount = sourceCount;
+        this.nodeId = nodeId;
+        this.startTime = System.currentTimeMillis();
+        if (curCsmCtrlId != TBaseConstants.META_VALUE_UNDEFINED) {
+            this.csmFromMaxOffsetCtrlId = curCsmCtrlId;
+        }
+        updClientReportInfo(curCsmCtrlId,
+                syncInfo.getLstAssignedTime(), syncInfo.getTopicMetaInfoId());
+        this.consumerViewInfo = toString();
+    }
+
+    public void updCurConsumerInfo(ConsumerInfo inCsmInfo) {
+        if (inCsmInfo.getConsumeType() != ConsumeType.CONSUME_CLIENT_REB) {
+            return;
+        }
+        this.overTLS = inCsmInfo.overTLS;
+        this.nodeId = inCsmInfo.getNodeId();
+        updClientReportInfo(inCsmInfo.getCsmFromMaxOffsetCtrlId(),
+                inCsmInfo.getLstAssignedTime(), inCsmInfo.getUsedTopicMetaInfoId());
+    }
+
+    public void updClientReportInfo(long lstCsmCtrlId,
+                                    long lastAssignedTime,
+                                    long usedTopicMetaInfoId) {
+        if (lstCsmCtrlId >= 0 && lstCsmCtrlId != this.csmFromMaxOffsetCtrlId) {
+            this.csmFromMaxOffsetCtrlId = lstCsmCtrlId;
+        }
+        if (lastAssignedTime != TBaseConstants.META_VALUE_UNDEFINED
+                && this.lstAssignedTime != lastAssignedTime) {
+            this.lstAssignedTime = lastAssignedTime;
+        }
+        if (usedTopicMetaInfoId != TBaseConstants.META_VALUE_UNDEFINED
+                && this.usedTopicMetaInfoId != usedTopicMetaInfoId) {
+            this.usedTopicMetaInfoId = usedTopicMetaInfoId;
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sBuilder =
+                new StringBuilder(512).append(consumerId)
+                        .append("@").append(group).append(":");
+        int count = 0;
+        for (String topicItem : topicSet) {
+            if (count++ > 0) {
+                sBuilder.append(",");
+            }
+            sBuilder.append(topicItem);
+        }
+        sBuilder.append("@overTLS=").append(overTLS);
+        return sBuilder.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof ConsumerInfo)) {
+            return false;
+        }
+        final ConsumerInfo info = (ConsumerInfo) obj;
+        return (this.consumerId.equals(info.getConsumerId()));
+    }
+
+    @Override
+    public int compareTo(ConsumerInfo o) {
+        if (!this.consumerId.equals(o.consumerId)) {
+            return this.consumerId.compareTo(o.consumerId);
+        }
+        if (!this.group.equals(o.group)) {
+            return this.group.compareTo(o.group);
+        }
+        return 0;
+    }
+
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    public String getGroupName() {
+        return group;
+    }
+
+    public Set<String> getTopicSet() {
+        return topicSet;
+    }
+
+    public Map<String, TreeSet<String>> getTopicConditions() {
+        return topicConditions;
+    }
+
+    public boolean isOverTLS() {
+        return overTLS;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public int getSourceCount() {
+        return this.sourceCount;
+    }
+
+    public int getNodeId() {
+        return this.nodeId;
+    }
+
+    public long getCsmFromMaxOffsetCtrlId() {
+        return csmFromMaxOffsetCtrlId;
+    }
+
+    public boolean isRequireBound() {
+        return this.consumeType == ConsumeType.CONSUME_BAND;
+    }
+
+    public String getSessionKey() {
+        return this.sessionKey;
+    }
+
+    public void setSessionKey(String sessionKey) {
+        this.sessionKey = sessionKey;
+    }
+
+    public Map<String, Long> getRequiredPartition() {
+        return this.requiredPartition;
+    }
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+    public boolean isSelectedBig() {
+        return selectedBig;
+    }
+
+    public String getConsumerViewInfo() {
+        return consumerViewInfo;
+    }
+
+    public long getLstAssignedTime() {
+        return lstAssignedTime;
+    }
+
+    public long getUsedTopicMetaInfoId() {
+        return usedTopicMetaInfoId;
+    }
+
+    public Tuple2<String, Boolean> getConsumerIdAndTlsInfoTuple() {
+        return new Tuple2<>(consumerId, overTLS);
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 995140d..fea5dcb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -17,464 +17,401 @@
 
 package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
+import org.apache.inlong.tubemq.server.common.utils.RowLock;
+import org.apache.inlong.tubemq.server.master.MasterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConsumerInfoHolder {
 
-    private final ConcurrentHashMap<String/* group */, ConsumerBandInfo> groupInfoMap =
+    private static final Logger logger =
+            LoggerFactory.getLogger(ConsumerInfoHolder.class);
+    private final MasterConfig masterConfig;     // master configure
+    private final RowLock groupRowLock;    //lock
+    private final ConcurrentHashMap<String/* group */, ConsumeGroupInfo> groupInfoMap =
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* consumerId */, String/* group */> consumerIndexMap =
             new ConcurrentHashMap<>();
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final ConcurrentHashSet<String/* group */> serverBalanceGroupSet =
+            new ConcurrentHashSet<>();
+    private final ConcurrentHashSet<String/* group */> clientBalanceGroupSet =
+            new ConcurrentHashSet<>();
+
+    public ConsumerInfoHolder(MasterConfig masterConfig) {
+        this.masterConfig = masterConfig;
+        this.groupRowLock = new RowLock("Group-RowLock",
+                this.masterConfig.getRowLockWaitDurMs());
+    }
+
+    public int getDefResourceRate() {
+        return masterConfig.getMaxGroupBrokerConsumeRate();
+    }
+
+    /**
+     * Judge whether the consumer group is empty
+     *
+     * @param groupName group name
+     * @return true: empty, false: not empty
+     */
+    public boolean isConsumeGroupEmpty(String groupName) {
+        if (groupName == null) {
+            return true;
+        }
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(groupName);
+        return (consumeGroupInfo == null || consumeGroupInfo.isGroupEmpty());
+    }
 
     /**
-     * Get consumer info list in a group
+     * Get consumer id list in a group
      *
      * @param group the consumer group name
-     * @return a consumer list
+     * @return the consumer id list of the group
      */
-    public List<ConsumerInfo> getConsumerList(String group) {
+    public List<String> getConsumerIdList(String group) {
         if (group == null) {
             return Collections.emptyList();
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                List<ConsumerInfo> oldConsumerList =
-                        oldConsumeBandInfo.getConsumerInfoList();
-                if (oldConsumerList != null) {
-                    List<ConsumerInfo> consumerList =
-                            new ArrayList<>(oldConsumerList.size());
-                    for (ConsumerInfo consumer : oldConsumerList) {
-                        if (consumer != null) {
-                            consumerList.add(consumer.clone());
-                        }
-                    }
-                    return consumerList;
-                }
-            }
-        } finally {
-            rwLock.readLock().unlock();
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo == null) {
+            return Collections.emptyList();
+        }
+        return consumeGroupInfo.getConsumerIdList();
+    }
+
+    /**
+     * Get the client information of the consumer group
+     *
+     * The query content of this API is the content presented when
+     *  the Web API queries the client information.
+     *
+     * @param group the consumer group name
+     * @return the consumer id with subscribed topic and link type of the group
+     */
+    public List<String> getConsumerViewList(String group) {
+        if (group == null) {
+            return Collections.emptyList();
         }
-        return Collections.emptyList();
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo == null) {
+            return Collections.emptyList();
+        }
+        return consumeGroupInfo.getConsumerViewInfos();
     }
 
     /**
-     * Get consumer band info
+     * Get the consumerId and tls information of the consumer group
+     *
+     * include consumerId and isOverTLS information.
+     *
+     * @param group the consumer group name
+     * @return the consumer info of the group
+     */
+    public List<Tuple2<String, Boolean>> getConsumerIdAndTlsInfos(String group) {
+        if (group == null) {
+            return Collections.emptyList();
+        }
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo == null) {
+            return Collections.emptyList();
+        }
+        return consumeGroupInfo.getConsumerIdAndTlsInfos();
+    }
+
+    /**
+     * Get consume group information
      *
      * @param group group name
-     * @return a ConsumerBandInfo
+     * @return consume group information
      */
-    public ConsumerBandInfo getConsumerBandInfo(String group) {
+    public ConsumeGroupInfo getConsumeGroupInfo(String group) {
         if (group == null) {
             return null;
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                return oldConsumeBandInfo.clone();
-            }
-        } finally {
-            rwLock.readLock().unlock();
-        }
-        return null;
+        return groupInfoMap.get(group);
     }
 
     /**
      * Add current check cycle
      *
      * @param group group name
-     * @return
+     * @return updated check cycle value
      */
     public Long addCurCheckCycle(String group) {
         if (group == null) {
             return null;
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                return oldConsumeBandInfo.addCurCheckCycle();
-            }
-        } finally {
-            rwLock.readLock().unlock();
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo != null) {
+            return consumeGroupInfo.addCurCheckCycle();
         }
         return null;
     }
 
     /**
-     * Set current broker/client ratio
+     * get subscribed topic set of group
      *
-     * @param group               group name
-     * @param defBClientRate      default broker/client ratio
-     * @param confBClientRate     config broker/client ratio
-     * @param curBClientRate      current broker/client ratio
-     * @param minRequireClientCnt minimal client count
-     * @param isRebalanced        if need re-balance
+     * @param group group name
+     * @return subscribed topic set
      */
-    public void setCurConsumeBClientInfo(String group, int defBClientRate,
-                                         int confBClientRate, int curBClientRate,
-                                         int minRequireClientCnt, boolean isRebalanced) {
+    public Set<String> getGroupTopicSet(String group) {
         if (group == null) {
-            return;
+            return Collections.emptySet();
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                oldConsumeBandInfo
-                        .setCurrConsumeBClientInfo(defBClientRate,
-                                confBClientRate, curBClientRate,
-                                minRequireClientCnt, isRebalanced);
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo != null) {
+            return consumeGroupInfo.getTopicSet();
+        }
+        return Collections.emptySet();
+    }
+
+    /**
+     * get current consumer count of group
+     *
+     * @param group group name
+     * @return consumer count
+     */
+    public int getConsumerCnt(String group) {
+        int count = 0;
+        if (group != null) {
+            ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+            if (consumeGroupInfo != null) {
+                count = consumeGroupInfo.getGroupCnt();
             }
-        } finally {
-            rwLock.readLock().unlock();
         }
-        return;
+        return count;
     }
 
     /**
-     * Add allocate time
+     * get need rebalanced consumer of group
      *
      * @param group group name
+     * @return need rebalanced consumer
      */
-    public void addAllocatedTimes(String group) {
+    public RebProcessInfo getNeedRebNodeList(String group) {
+        RebProcessInfo rebProcessInfo = new RebProcessInfo();
         if (group == null) {
-            return;
+            return rebProcessInfo;
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                oldConsumeBandInfo.addAllocatedTimes();
-            }
-        } finally {
-            rwLock.readLock().unlock();
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo != null) {
+            rebProcessInfo = consumeGroupInfo.getNeedBalanceNodes();
         }
-        return;
+        return rebProcessInfo;
     }
 
     /**
-     * Set allocated value
+     * set rebalanced consumer id of group
      *
      * @param group group name
+     * @param processList rebalanced consumer id
      */
-    public void setAllocated(String group) {
+    public void setRebNodeProcessed(String group,
+                                    List<String> processList) {
         if (group == null) {
             return;
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                oldConsumeBandInfo.settAllocated();
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo != null) {
+            consumeGroupInfo.setBalanceNodeProcessed(processList);
+        }
+    }
+
+    /**
+     * booked need rebalance consumer of group
+     *
+     * @param group group name
+     * @param consumerIdSet need rebalance consumerId
+     * @param waitDuration wait duration
+     */
+    public void addRebConsumerInfo(String group, Set<String> consumerIdSet, int waitDuration) {
+        ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+        if (consumeGroupInfo != null) {
+            for (String consumerId : consumerIdSet) {
+                String oldGroup = consumerIndexMap.get(consumerId);
+                if (group.equals(oldGroup)) {
+                    consumeGroupInfo.addNodeRelInfo(consumerId, waitDuration);
+                }
             }
-        } finally {
-            rwLock.readLock().unlock();
         }
-        return;
     }
 
     /**
      * Check if allocated
      *
      * @param group group name
-     * @return if not allocated
+     * @return allocate status
      */
     public boolean isNotAllocated(String group) {
         if (group == null) {
             return false;
         }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                return oldConsumeBandInfo.isNotAllocate();
-            }
-        } finally {
-            rwLock.readLock().unlock();
+        ConsumeGroupInfo consumeGroupInfo =
+                groupInfoMap.get(group);
+        if (consumeGroupInfo != null) {
+            return consumeGroupInfo.isNotAllocate();
         }
         return false;
     }
 
-    public Set<String> getGroupTopicSet(String group) {
-        if (group != null) {
-            try {
-                rwLock.readLock().lock();
-                ConsumerBandInfo oldConsumeBandInfo =
-                        groupInfoMap.get(group);
-                if (oldConsumeBandInfo != null) {
-                    return oldConsumeBandInfo.getTopicSet();
-                }
-            } finally {
-                rwLock.readLock().unlock();
-            }
+    /**
+     * get group name of consumer
+     *
+     * @param consumerId consumer id
+     * @return the group name of consumer
+     */
+    public String getGroupName(String consumerId) {
+        return consumerIndexMap.get(consumerId);
+    }
+
+    /**
+     * get all registered group name
+     *
+     * @return the group name registered
+     */
+    public List<String> getAllGroupName() {
+        if (groupInfoMap.isEmpty()) {
+            return Collections.emptyList();
         }
-        return Collections.emptySet();
+        return new ArrayList<>(groupInfoMap.keySet());
     }
 
     /**
-     * Get the group's subscribed topics and ConsumerInfos.
+     * get all server-balance group name
      *
-     * @param group   group to be queried
-     * @param result  query result, only read
-     * @return Has the data been found
+     * @return the group name registered
      */
-    public boolean getGroupTopicSetAndConsumerInfos(
-            String group, Tuple2<Set<String>, List<ConsumerInfo>> result) {
-        if (group != null) {
-            try {
-                rwLock.readLock().lock();
-                ConsumerBandInfo curGroupInfo =
-                        groupInfoMap.get(group);
-                if (curGroupInfo != null) {
-                    result.setF0AndF1(curGroupInfo.getTopicSet(),
-                            curGroupInfo.cloneConsumerInfoList());
-                    return true;
-                }
-            } finally {
-                rwLock.readLock().unlock();
-            }
+    public List<String> getAllServerBalanceGroups() {
+        if (serverBalanceGroupSet.isEmpty()) {
+            return Collections.emptyList();
         }
-        result.setF0AndF1(Collections.emptySet(), Collections.emptyList());
-        return false;
+        return new ArrayList<>(serverBalanceGroupSet);
     }
 
     /**
-     * Get the group's subscribed topics and client count.
+     * get all client-balance group name
      *
-     * @param group   group to be queried
-     * @param result  query result, only read
-     * @return Has the data been found
+     * @return the group name registered
      */
-    public boolean getGroupTopicSetAndClientCnt(String group,
-                                                Tuple2<Set<String>, Integer> result) {
-        if (group != null) {
-            try {
-                rwLock.readLock().lock();
-                ConsumerBandInfo curGroupInfo =
-                        groupInfoMap.get(group);
-                if (curGroupInfo != null) {
-                    result.setF0AndF1(curGroupInfo.getTopicSet(),
-                            curGroupInfo.getGroupCnt());
-                    return true;
-                }
-            } finally {
-                rwLock.readLock().unlock();
-            }
+    public List<String> getAllClientBalanceGroups() {
+        if (clientBalanceGroupSet.isEmpty()) {
+            return Collections.emptyList();
         }
-        result.setF0AndF1(Collections.emptySet(), 0);
-        return false;
+        return new ArrayList<>(clientBalanceGroupSet);
     }
 
     /**
-     * Add a consumer into consumer band info, if consumer band info not exist, will create a new one
+     * get all registerd group name
      *
-     * @param consumer       consumer info
-     * @param isNotAllocated if not allocated
-     * @param isSelectedBig  select big data
-     * @return a ConsumerBandInfo
+     * @param consumerId  the consumer id
+     * @return the consumer info
      */
-    public ConsumerBandInfo addConsumer(ConsumerInfo consumer,
-                                        boolean isNotAllocated,
-                                        boolean isSelectedBig) {
-        ConsumerBandInfo consumeBandInfo = null;
-        String group = consumer.getGroup();
-        try {
-            rwLock.writeLock().lock();
-            consumeBandInfo = groupInfoMap.get(group);
-            if (consumeBandInfo == null) {
-                ConsumerBandInfo tmpBandInfo =
-                        new ConsumerBandInfo(isSelectedBig);
-                consumeBandInfo =
-                        groupInfoMap.putIfAbsent(group, tmpBandInfo);
-                if (consumeBandInfo == null) {
-                    consumeBandInfo = tmpBandInfo;
-                }
-            }
-            consumeBandInfo.addConsumer(consumer);
-            if (!isNotAllocated) {
-                consumeBandInfo.settAllocated();
+    public ConsumerInfo getConsumerInfo(String consumerId) {
+        ConsumerInfo consumerInfo = null;
+        String groupName = consumerIndexMap.get(consumerId);
+        if (groupName != null) {
+            ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(groupName);
+            if (consumeGroupInfo != null) {
+                consumerInfo = consumeGroupInfo.getConsumerInfo(consumerId);
             }
-            consumerIndexMap.put(consumer.getConsumerId(), group);
-        } finally {
-            rwLock.writeLock().unlock();
         }
-        return consumeBandInfo;
+        return consumerInfo;
     }
 
-    public void addRebConsumerInfo(String group,
-                                   Set<String> consumerIdSet,
-                                   int waitDuration) {
+    /**
+     * Add consumer and return group object,
+     * if the consumer is the first one, then create the group object
+     *
+     * @param consumer consumer info
+     * @param isNotAllocated whether balanced
+     * @param sBuffer  string buffer
+     * @param result   check result
+     * @return process result
+     */
+    public boolean addConsumer(ConsumerInfo consumer, boolean isNotAllocated,
+                               StringBuilder sBuffer, ParamCheckResult result) {
+        ConsumeGroupInfo consumeGroupInfo = null;
+        String group = consumer.getGroupName();
+        Integer lid = null;
         try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo consumeBandInfo =
-                    groupInfoMap.get(group);
-            if (consumeBandInfo != null) {
-                for (String consumerId : consumerIdSet) {
-                    String oldGroup = consumerIndexMap.get(consumerId);
-                    if (group.equals(oldGroup)) {
-                        consumeBandInfo.addNodeRelInfo(consumerId, waitDuration);
+            lid = groupRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(group), true);
+            consumeGroupInfo = groupInfoMap.get(group);
+            if (consumeGroupInfo == null) {
+                ConsumeGroupInfo tmpGroupInfo = new ConsumeGroupInfo(consumer);
+                consumeGroupInfo = groupInfoMap.putIfAbsent(group, tmpGroupInfo);
+                if (consumeGroupInfo == null) {
+                    consumeGroupInfo = tmpGroupInfo;
+                    if (tmpGroupInfo.isClientBalance()) {
+                        clientBalanceGroupSet.add(group);
+                    } else {
+                        serverBalanceGroupSet.add(group);
                     }
                 }
             }
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    public RebProcessInfo getNeedRebNodeList(String group) {
-        RebProcessInfo rebProcessInfo = new RebProcessInfo();
-        if (group == null) {
-            return rebProcessInfo;
-        }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo consumeBandInfo =
-                    groupInfoMap.get(group);
-            if (consumeBandInfo != null) {
-                rebProcessInfo = consumeBandInfo.getNeedRebNodeList();
+            if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) {
+                if (!isNotAllocated) {
+                    consumeGroupInfo.settAllocated();
+                }
+                consumerIndexMap.put(consumer.getConsumerId(), group);
+                result.setCheckData(consumeGroupInfo);
             }
+        } catch (IOException e) {
+            logger.warn("Failed to lock.", e);
         } finally {
-            rwLock.readLock().unlock();
-        }
-        return rebProcessInfo;
-    }
-
-    public void setRebNodeProcessed(String group,
-                                    List<String> processList) {
-        if (group == null) {
-            return;
-        }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo consumeBandInfo =
-                    groupInfoMap.get(group);
-            if (consumeBandInfo != null) {
-                consumeBandInfo.setRebNodeProcessed(processList);
+            if (lid != null) {
+                groupRowLock.releaseRowLock(lid);
             }
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    public boolean exist(String consumerId) {
-        boolean isExist = false;
-        try {
-            rwLock.readLock().lock();
-            isExist = consumerIndexMap.containsKey(consumerId);
-        } finally {
-            rwLock.readLock().unlock();
         }
-        return isExist;
+        return result.result;
     }
 
-    public String getGroup(String consumerId) {
-        String groupName = null;
-        try {
-            rwLock.readLock().lock();
-            groupName = consumerIndexMap.get(consumerId);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-        return groupName;
-    }
-
-    public ConsumerInfo removeConsumer(String group,
-                                       String consumerId) {
-        if (group == null
-                || consumerId == null) {
+    /**
+     * remove the consumer and return consumer object,
+     * if the consumer is the latest one, then removed the group object
+     *
+     * @param group group name of consumer
+     * @param consumerId consumer id
+     * @return ConsumerInfo
+     */
+    public ConsumerInfo removeConsumer(String group, String consumerId) {
+        if (group == null || consumerId == null) {
             return null;
         }
         ConsumerInfo consumer = null;
+        Integer lid = null;
         try {
-            rwLock.writeLock().lock();
-            ConsumerBandInfo consumeBandInfo =
-                    groupInfoMap.get(group);
-            if (consumeBandInfo != null) {
-                consumer = consumeBandInfo.removeConsumer(consumerId);
-                if (consumeBandInfo.getGroupCnt() == 0) {
+            lid = groupRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(group), true);
+            ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+            if (consumeGroupInfo != null) {
+                consumer = consumeGroupInfo.removeConsumer(consumerId);
+                if (consumeGroupInfo.isGroupEmpty()) {
                     groupInfoMap.remove(group);
+                    if (consumeGroupInfo.isClientBalance()) {
+                        clientBalanceGroupSet.add(group);
+                    } else {
+                        serverBalanceGroupSet.add(group);
+                    }
                 }
             }
             consumerIndexMap.remove(consumerId);
+        } catch (IOException e) {
+            logger.warn("Failed to lock.", e);
         } finally {
-            rwLock.writeLock().unlock();
-        }
-        return consumer;
-    }
-
-    public Tuple2<String, ConsumerInfo> getConsumeTupleInfo(String consumerId) {
-        try {
-            rwLock.readLock().lock();
-            ConsumerInfo consumerInfo = null;
-            String groupName = consumerIndexMap.get(consumerId);
-            if (groupName != null) {
-                ConsumerBandInfo consumeBandInfo = groupInfoMap.get(groupName);
-                if (consumeBandInfo != null) {
-                    consumerInfo = consumeBandInfo.getConsumerInfo(consumerId);
-                }
+            if (lid != null) {
+                groupRowLock.releaseRowLock(lid);
             }
-            return new Tuple2<>(groupName, consumerInfo);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    public List<String> getAllGroup() {
-        try {
-            rwLock.readLock().lock();
-            if (groupInfoMap.isEmpty()) {
-                return Collections.emptyList();
-            } else {
-                List<String> groupList =
-                        new ArrayList<>(groupInfoMap.size());
-                groupList.addAll(groupInfoMap.keySet());
-                return groupList;
-            }
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    public int getConsumerCnt(String group) {
-        int count = 0;
-        if (group == null) {
-            return 0;
-        }
-        try {
-            rwLock.readLock().lock();
-            ConsumerBandInfo oldConsumeBandInfo =
-                    groupInfoMap.get(group);
-            if (oldConsumeBandInfo != null) {
-                count = oldConsumeBandInfo.getGroupCnt();
-            }
-        } finally {
-            rwLock.readLock().unlock();
         }
-        return count;
-    }
-
-    public void clear() {
-        consumerIndexMap.clear();
-        groupInfoMap.clear();
+        return consumer;
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
new file mode 100644
index 0000000..766af32
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+
+public class TopicConfigInfo implements Comparable<TopicConfigInfo> {
+    private final int brokerId;
+    private final String topicName;
+    private final int numStore;
+    private final int numPart;
+    private boolean acceptSub;
+
+    public TopicConfigInfo(TopicDeployEntity topicDeployEntity) {
+        this.brokerId = topicDeployEntity.getBrokerId();
+        this.topicName = topicDeployEntity.getTopicName();
+        this.numStore = topicDeployEntity.getNumTopicStores();
+        this.numPart = topicDeployEntity.getNumPartitions();
+        this.acceptSub = false;
+    }
+
+    public int getBrokerId() {
+        return brokerId;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public int getNumStore() {
+        return numStore;
+    }
+
+    public void setAcceptSub(boolean acceptSub) {
+        this.acceptSub = acceptSub;
+    }
+
+    public boolean isAcceptSub() {
+        return acceptSub;
+    }
+
+    public int getNumPart() {
+        return numPart;
+    }
+
+    @Override
+    public int compareTo(TopicConfigInfo o) {
+        return Integer.compare(this.brokerId, o.brokerId);
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java
index 4aa633a..e5d6545 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java
@@ -23,13 +23,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.servlet.http.HttpServletRequest;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.tubemq.corebase.TokenConstants;
 import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
 import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
-import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.apache.inlong.tubemq.corerpc.exception.StandbyException;
@@ -98,21 +97,26 @@ public class Master implements Action {
         ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
         String group = req.getParameter("group");
         if (group != null) {
-            List<ConsumerInfo> consumerList = consumerHolder.getConsumerList(group);
             int index = 1;
-            if (consumerList != null && !consumerList.isEmpty()) {
-                Collections.sort(consumerList);
-                for (ConsumerInfo consumer : consumerList) {
-                    sBuilder.append(index).append(". ").append(consumer.toString()).append("\n");
-                    index++;
-                }
-            } else {
-                sBuilder.append("No such group.\n\nCurrent all groups");
-                List<String> groupList = consumerHolder.getAllGroup();
-                sBuilder.append("(").append(groupList.size()).append("):\n");
+            List<String> consumerViewInfos =
+                    consumerHolder.getConsumerViewList(group);
+            if (CollectionUtils.isEmpty(consumerViewInfos)) {
+                List<String> groupList = consumerHolder.getAllGroupName();
+                sBuilder.append("No such group.\n\nCurrent all groups(")
+                        .append(groupList.size()).append("):\n");
                 for (String currGroup : groupList) {
                     sBuilder.append(currGroup).append("\n");
                 }
+            } else {
+                Collections.sort(consumerViewInfos);
+                for (String consumerViewInfo : consumerViewInfos) {
+                    if (consumerViewInfo == null) {
+                        continue;
+                    }
+                    sBuilder.append(index).append(". ")
+                            .append(consumerViewInfo).append("\n");
+                    index++;
+                }
             }
         }
     }
@@ -128,20 +132,27 @@ public class Master implements Action {
         ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
         String group = req.getParameter("group");
         if (group != null) {
-            List<ConsumerInfo> consumerList = consumerHolder.getConsumerList(group);
-            if (consumerList != null && !consumerList.isEmpty()) {
-                Collections.sort(consumerList);
+            List<Tuple2<String, Boolean>> consumerList =
+                    consumerHolder.getConsumerIdAndTlsInfos(group);
+            if (CollectionUtils.isEmpty(consumerList)) {
+                List<String> groupList = consumerHolder.getAllGroupName();
+                sBuilder.append("No such group.\n\nCurrent all group(")
+                        .append(groupList.size()).append("):\n");
+                for (String currGroup : groupList) {
+                    sBuilder.append(currGroup).append("\n");
+                }
+            } else {
                 sBuilder.append("\n########################## Subscribe Relationship ############################\n\n");
                 Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
                         master.getCurrentSubInfoMap();
                 for (int i = 0; i < consumerList.size(); i++) {
-                    ConsumerInfo consumer = consumerList.get(i);
+                    Tuple2<String, Boolean> consumer = consumerList.get(i);
                     sBuilder.append("*************** ").append(i + 1)
-                            .append(". ").append(consumer.getConsumerId())
-                            .append("#isOverTLS=").append(consumer.isOverTLS())
+                            .append(". ").append(consumer.getF0())
+                            .append("#isOverTLS=").append(consumer.getF1())
                             .append(" ***************");
                     Map<String, Map<String, Partition>> topicSubMap =
-                            currentSubInfoMap.get(consumer.getConsumerId());
+                            currentSubInfoMap.get(consumer.getF0());
                     if (topicSubMap != null) {
                         int totalSize = 0;
                         for (Map.Entry<String, Map<String, Partition>> entry : topicSubMap.entrySet()) {
@@ -152,7 +163,7 @@ public class Master implements Action {
                             Map<String, Partition> partMap = entry.getValue();
                             if (partMap != null) {
                                 for (Partition part : partMap.values()) {
-                                    sBuilder.append(consumer.getConsumerId())
+                                    sBuilder.append(consumer.getF0())
                                             .append("#").append(part.toString()).append("\n");
                                 }
                             }
@@ -160,13 +171,6 @@ public class Master implements Action {
                     }
                     sBuilder.append("\n\n");
                 }
-            } else {
-                sBuilder.append("No such group.\n\nCurrent all group");
-                List<String> groupList = consumerHolder.getAllGroup();
-                sBuilder.append("(").append(groupList.size()).append("):\n");
-                for (String currGroup : groupList) {
-                    sBuilder.append(currGroup).append("\n");
-                }
             }
         }
     }
@@ -192,7 +196,7 @@ public class Master implements Action {
             if (topic != null) {
                 TopicPSInfoManager topicPSInfoManager =
                         master.getTopicPSInfoManager();
-                ConcurrentHashSet<String> producerSet =
+                Set<String> producerSet =
                         topicPSInfoManager.getTopicPubInfo(topic);
                 if (producerSet != null && !producerSet.isEmpty()) {
                     int index = 1;
@@ -279,7 +283,8 @@ public class Master implements Action {
      */
     private void getTopicPubInfo(final HttpServletRequest req, StringBuilder sBuilder) {
         String topic = req.getParameter("topic");
-        Set<String> producerIds = master.getTopicPSInfoManager().getTopicPubInfo(topic);
+        Set<String> producerIds =
+                master.getTopicPSInfoManager().getTopicPubInfo(topic);
         if (producerIds != null && !producerIds.isEmpty()) {
             for (String producerId : producerIds) {
                 sBuilder.append(producerId).append("\n");
@@ -301,27 +306,38 @@ public class Master implements Action {
         Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
                 master.getCurrentSubInfoMap();
         int currPartSize = 0;
-        List<String> groupList = consumerHolder.getAllGroup();
-        Tuple2<Set<String>, List<ConsumerInfo>> queryInfo = new Tuple2<>();
+        Set<String> topicSet;
+        List<Partition> partList;
+        List<String> consumerIdList;
+        Map<String, Partition> topicSubInfoMap;
+        Map<String, Map<String, Partition>> consumerSubInfoMap;
+        List<String> groupList = consumerHolder.getAllServerBalanceGroups();
         for (String group : groupList) {
-            if (!consumerHolder.getGroupTopicSetAndConsumerInfos(group, queryInfo)) {
+            if (group == null) {
                 continue;
             }
-            for (String topic : queryInfo.getF0()) {
+            topicSet = consumerHolder.getGroupTopicSet(group);
+            for (String topic : topicSet) {
+                if (topic == null) {
+                    continue;
+                }
                 currPartSize = 0;
-                for (ConsumerInfo consumer : queryInfo.getF1()) {
-                    Map<String, Map<String, Partition>> consumerSubInfoMap =
-                            currentSubInfoMap.get(consumer.getConsumerId());
-                    if (consumerSubInfoMap != null) {
-                        Map<String, Partition> topicSubInfoMap =
-                                consumerSubInfoMap.get(topic);
-                        if (topicSubInfoMap != null) {
-                            currPartSize += topicSubInfoMap.size();
+                consumerIdList = consumerHolder.getConsumerIdList(group);
+                if (CollectionUtils.isNotEmpty(consumerIdList)) {
+                    for (String consumerId : consumerIdList) {
+                        if (consumerId == null) {
+                            continue;
+                        }
+                        consumerSubInfoMap = currentSubInfoMap.get(consumerId);
+                        if (consumerSubInfoMap != null) {
+                            topicSubInfoMap = consumerSubInfoMap.get(topic);
+                            if (topicSubInfoMap != null) {
+                                currPartSize += topicSubInfoMap.size();
+                            }
                         }
                     }
                 }
-                List<Partition> partList =
-                        brokerRunManager.getSubBrokerAcceptSubParts(topic);
+                partList = brokerRunManager.getSubBrokerAcceptSubParts(topic);
                 if (currPartSize != partList.size()) {
                     sBuilder.append(group).append(":").append(topic).append("\n");
                 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 7d628cd..7aec03b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -32,7 +32,7 @@ import org.apache.inlong.tubemq.server.master.TMaster;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
 import org.slf4j.Logger;
@@ -724,16 +724,16 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
         Set<String> consumerIdSet = (Set<String>) result.getRetData();
         ConsumerInfoHolder consumerInfoHolder =
                 master.getConsumerHolder();
-        ConsumerBandInfo consumerBandInfo =
-                consumerInfoHolder.getConsumerBandInfo(groupName);
-        if (consumerBandInfo == null) {
+        ConsumeGroupInfo consumeGroupInfo =
+                consumerInfoHolder.getConsumeGroupInfo(groupName);
+        if (consumeGroupInfo == null) {
             String errInfo = sBuffer.append("The group(")
                     .append(groupName).append(") not online!").toString();
             sBuffer.delete(0, sBuffer.length());
             WebParameterUtils.buildFailResult(sBuffer, errInfo);
             return sBuffer;
         }
-        Map<String, NodeRebInfo> nodeRebInfoMap = consumerBandInfo.getRebalanceMap();
+        Map<String, NodeRebInfo> nodeRebInfoMap = consumeGroupInfo.getBalanceMap();
         for (String consumerId : consumerIdSet) {
             if (nodeRebInfoMap.containsKey(consumerId)) {
                 String errInfo = sBuffer.append("Duplicated set for consumerId(")
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index bf7a3cb..10791a0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -27,15 +27,16 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.servlet.http.HttpServletRequest;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
+
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
-import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
 import org.apache.inlong.tubemq.server.master.TMaster;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeType;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
 
@@ -96,25 +97,23 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                 topicPSInfoManager.getGroupSetWithSubTopic(inGroupNameSet, topicNameSet);
         int totalCnt = 0;
         int topicCnt = 0;
-        Tuple2<Set<String>, Integer> queryInfo = new Tuple2<>();
         ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (String group : queryGroupSet) {
-            if (!consumerHolder.getGroupTopicSetAndClientCnt(group, queryInfo)) {
-                continue;
-            }
             if (totalCnt++ > 0) {
                 sBuffer.append(",");
             }
             sBuffer.append("{\"consumeGroup\":\"").append(group).append("\",\"topicSet\":[");
             topicCnt = 0;
-            for (String tmpTopic : queryInfo.getF0()) {
+            Set<String> topicSet = consumerHolder.getGroupTopicSet(group);
+            int consumerCnt = consumerHolder.getConsumerCnt(group);
+            for (String tmpTopic : topicSet) {
                 if (topicCnt++ > 0) {
                     sBuffer.append(",");
                 }
                 sBuffer.append("\"").append(tmpTopic).append("\"");
             }
-            sBuffer.append("],\"consumerNum\":").append(queryInfo.getF1()).append("}");
+            sBuffer.append("],\"consumerNum\":").append(consumerCnt).append("}");
         }
         WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
         return sBuffer;
@@ -143,7 +142,7 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
         }
         String strConsumeGroup = (String) result.getRetData();
         try {
-            boolean isBandConsume = false;
+            ConsumeType consumeType = ConsumeType.CONSUME_NORMAL;
             boolean isNotAllocate = false;
             boolean isSelectBig = true;
             String sessionKey = "";
@@ -154,37 +153,37 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
             int confBClientRate = -2;
             int curBClientRate = -2;
             int minRequireClientCnt = -2;
-            int rebalanceStatus = -2;
+            int balanceStatus = -2;
             Set<String> topicSet = new HashSet<>();
             List<ConsumerInfo> consumerList = new ArrayList<>();
             Map<String, NodeRebInfo> nodeRebInfoMap = new ConcurrentHashMap<>();
             Map<String, TreeSet<String>> existedTopicConditions = new HashMap<>();
             ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
-            ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(strConsumeGroup);
-            if (consumerBandInfo != null) {
-                if (consumerBandInfo.getTopicSet() != null) {
-                    topicSet = consumerBandInfo.getTopicSet();
+            ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(strConsumeGroup);
+            if (consumeGroupInfo != null) {
+                if (consumeGroupInfo.getTopicSet() != null) {
+                    topicSet = consumeGroupInfo.getTopicSet();
                 }
-                if (consumerBandInfo.getConsumerInfoList() != null) {
-                    consumerList = consumerBandInfo.getConsumerInfoList();
+                if (consumeGroupInfo.getConsumerInfoList() != null) {
+                    consumerList = consumeGroupInfo.getConsumerInfoList();
                 }
-                if (consumerBandInfo.getTopicConditions() != null) {
-                    existedTopicConditions = consumerBandInfo.getTopicConditions();
+                if (consumeGroupInfo.getTopicConditions() != null) {
+                    existedTopicConditions = consumeGroupInfo.getTopicConditions();
                 }
-                nodeRebInfoMap = consumerBandInfo.getRebalanceMap();
-                isBandConsume = consumerBandInfo.isBandConsume();
-                rebalanceStatus = consumerBandInfo.getRebalanceCheckStatus();
-                defBClientRate = consumerBandInfo.getDefBClientRate();
-                confBClientRate = consumerBandInfo.getConfBClientRate();
-                curBClientRate = consumerBandInfo.getCurBClientRate();
-                minRequireClientCnt = consumerBandInfo.getMinRequireClientCnt();
-                if (isBandConsume) {
-                    isNotAllocate = consumerBandInfo.isNotAllocate();
-                    isSelectBig = consumerBandInfo.isSelectedBig();
-                    sessionKey = consumerBandInfo.getSessionKey();
-                    reqSourceCount = consumerBandInfo.getSourceCount();
-                    curSourceCount = consumerBandInfo.getGroupCnt();
-                    rebalanceCheckTime = consumerBandInfo.getCurCheckCycle();
+                nodeRebInfoMap = consumeGroupInfo.getBalanceMap();
+                consumeType = consumeGroupInfo.getConsumeType();
+                balanceStatus = consumeGroupInfo.getBalanceChkStatus();
+                defBClientRate = consumerHolder.getDefResourceRate();
+                confBClientRate = consumeGroupInfo.getConfResourceRate();
+                curBClientRate = consumeGroupInfo.getCurResourceRate();
+                minRequireClientCnt = consumeGroupInfo.getMinReqClientCnt();
+                if (consumeType == ConsumeType.CONSUME_BAND) {
+                    isNotAllocate = consumeGroupInfo.isNotAllocate();
+                    isSelectBig = consumeGroupInfo.isSelectedBig();
+                    sessionKey = consumeGroupInfo.getSessionKey();
+                    reqSourceCount = consumeGroupInfo.getSourceCount();
+                    curSourceCount = consumeGroupInfo.getGroupCnt();
+                    rebalanceCheckTime = consumeGroupInfo.getCurCheckCycle();
                 }
             }
             sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"")
@@ -205,9 +204,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                 sBuffer.append("\"").append(entry.getKey()).append("\":");
                 sBuffer = entry.getValue().toJsonString(sBuffer);
             }
-            sBuffer.append("},\"isBandConsume\":").append(isBandConsume);
+            sBuffer.append("},\"isBandConsume\":\"").append(consumeType.getName()).append("\"");
             // Append band consume info
-            if (isBandConsume) {
+            if (consumeType == ConsumeType.CONSUME_BAND) {
                 sBuffer.append(",\"isNotAllocate\":").append(isNotAllocate)
                         .append(",\"sessionKey\":\"").append(sessionKey)
                         .append("\",\"isSelectBig\":").append(isSelectBig)
@@ -216,9 +215,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                         .append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime);
             }
             sBuffer.append(",\"rebInfo\":{");
-            if (rebalanceStatus == -2) {
+            if (balanceStatus == -2) {
                 sBuffer.append("\"isRebalanced\":false");
-            } else if (rebalanceStatus == 0) {
+            } else if (balanceStatus == 0) {
                 sBuffer.append("\"isRebalanced\":true,\"checkPasted\":false")
                         .append(",\"defBClientRate\":").append(defBClientRate)
                         .append(",\"confBClientRate\":").append(confBClientRate)
@@ -252,7 +251,7 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
             }
             sBuffer.append("}");
             // Append consumer info of the group
-            getConsumerInfoList(consumerList, isBandConsume, sBuffer);
+            getConsumerInfoList(consumerList, consumeType, sBuffer);
             sBuffer.append("}");
         } catch (Exception e) {
             sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
@@ -265,11 +264,11 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
      * Private method to append consumer info of the give list to a string builder
      *
      * @param consumerList  consumer list
-     * @param isBandConsume whether bound consume
+     * @param consumeType   consume type
      * @param strBuffer     string buffer
      */
     private void getConsumerInfoList(final List<ConsumerInfo> consumerList,
-                                     boolean isBandConsume, final StringBuilder strBuffer) {
+                                     ConsumeType consumeType, final StringBuilder strBuffer) {
         strBuffer.append(",\"data\":[");
         if (!consumerList.isEmpty()) {
             Collections.sort(consumerList);
@@ -285,7 +284,7 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                 }
                 strBuffer.append("{\"consumerId\":\"").append(consumer.getConsumerId())
                         .append("\"").append(",\"isOverTLS\":").append(consumer.isOverTLS());
-                if (isBandConsume) {
+                if (consumeType == ConsumeType.CONSUME_BAND) {
                     Map<String, Long> requiredPartition = consumer.getRequiredPartition();
                     if (requiredPartition == null || requiredPartition.isEmpty()) {
                         strBuffer.append(",\"initReSetPartCount\":0,\"initReSetPartInfo\":[]");
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
index ee4a443..b073e48 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
@@ -19,7 +19,7 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodebroker;
 
 import java.util.Arrays;
 import java.util.HashSet;
-import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
+import java.util.Set;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -40,17 +40,21 @@ public class TopicPSInfoManagerTest {
 
     @Test
     public void topicSubInfo() {
-        ConcurrentHashSet<String> groupSet = new ConcurrentHashSet<>();
+        Set<String> groupSet = new HashSet<>();
         groupSet.add("group_001");
         groupSet.add("group_002");
         groupSet.add("group_003");
+        Set<String> topicSet = new HashSet<>();
+        topicSet.add("topic001");
+        for (String groupName : groupSet) {
+            topicPSInfoManager.addGroupSubTopicInfo(groupName, topicSet);
+        }
 
-        topicPSInfoManager.setTopicSubInfo("topic001", groupSet);
-        ConcurrentHashSet<String> gs1 = topicPSInfoManager.getTopicSubInfo("topic001");
+        Set<String> gs1 = topicPSInfoManager.getTopicSubInfo("topic001");
         Assert.assertEquals(3, gs1.size());
 
-        topicPSInfoManager.removeTopicSubInfo("topic001", "group_001");
-        topicPSInfoManager.removeTopicSubInfo("topic001", "group_002");
+        topicPSInfoManager.rmvGroupSubTopicInfo("group_001", topicSet);
+        topicPSInfoManager.rmvGroupSubTopicInfo("group_002", topicSet);
         gs1 = topicPSInfoManager.getTopicSubInfo("topic001");
         Assert.assertEquals(1, gs1.size());
     }
@@ -63,14 +67,14 @@ public class TopicPSInfoManagerTest {
         topicList.add("topic003");
 
         topicPSInfoManager.addProducerTopicPubInfo("producer_001", topicList);
-        ConcurrentHashSet<String> ti1 = topicPSInfoManager.getTopicPubInfo("topic001");
+        Set<String> ti1 = topicPSInfoManager.getTopicPubInfo("topic001");
         Assert.assertEquals(1, ti1.size());
         Assert.assertTrue(ti1.contains("producer_001"));
 
         topicPSInfoManager.rmvProducerTopicPubInfo("producer_001",
                 new HashSet<>(Arrays.asList("topic001", "topic002")));
 
-        ConcurrentHashSet<String> ti2 = topicPSInfoManager.getTopicPubInfo("topic003");
+        Set<String> ti2 = topicPSInfoManager.getTopicPubInfo("topic003");
         Assert.assertEquals(1, ti2.size());
         Assert.assertTrue(ti2.contains("producer_001"));
     }