You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/11 01:57:56 UTC

[inlong] branch master updated: [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0133149cd [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)
0133149cd is described below

commit 0133149cdddab4478be5f45477c066f3d5c72a55
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Oct 11 09:57:50 2022 +0800

    [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)
---
 .../{AllowedSetting.java => MaxMsgSizeHolder.java} |  26 +-
 .../tubemq/client/producer/ProducerManager.java    | 120 +++----
 .../client/producer/SimpleMessageProducer.java     |   4 +-
 .../tubemq/corebase/utils/DataConverterUtil.java   |  60 ++--
 .../inlong/tubemq/corebase/utils/Tuple3.java       |   9 +
 .../inlong/tubemq/corerpc/RemoteConErrStats.java   |   4 +-
 .../inlong/tubemq/corerpc/client/CallFuture.java   |   5 +-
 .../corerpc/codec/DataConverterUtilTest.java       |   6 +-
 .../inlong/tubemq/server/master/TMaster.java       | 392 +++++++++++----------
 .../nodemanage/nodebroker/BrokerAbnHolder.java     |   6 +-
 .../nodemanage/nodebroker/BrokerPSInfoHolder.java  |  30 +-
 .../nodemanage/nodebroker/BrokerRunManager.java    |   7 +-
 .../nodemanage/nodebroker/BrokerSyncData.java      |   1 -
 .../nodemanage/nodebroker/BrokerTopicInfoView.java |  44 ++-
 .../nodemanage/nodebroker/DefBrokerRunManager.java |  15 +-
 .../master/web/handler/WebMasterInfoHandler.java   |  37 +-
 .../master/web/handler/WebTopicDeployHandler.java  | 139 ++++----
 17 files changed, 485 insertions(+), 420 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
similarity index 66%
rename from inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java
rename to inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
index 22d02dc6b..80c84049b 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.tubemq.client.producer;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
@@ -24,16 +26,17 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
 import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils;
 
 /**
- * The class class caches the dynamic settings
+ * The class caches the max msg size settings
  *  returned from the server.
  */
-public class AllowedSetting {
-    private AtomicLong configId =
+public class MaxMsgSizeHolder {
+    private final AtomicLong configId =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
-    private AtomicInteger maxMsgSize =
+    private final AtomicInteger defMaxMsgSize =
             new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE);
+    private Map<String, Integer> topicMaxSizeInBMap = new ConcurrentHashMap<>();
 
-    public AllowedSetting() {
+    public MaxMsgSizeHolder() {
 
     }
 
@@ -44,18 +47,23 @@ public class AllowedSetting {
                 configId.set(allowedConfig.getConfigId());
             }
             if (allowedConfig.hasMaxMsgSize()
-                    && allowedConfig.getMaxMsgSize() != maxMsgSize.get()) {
-                maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
+                    && allowedConfig.getMaxMsgSize() != defMaxMsgSize.get()) {
+                defMaxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
                         allowedConfig.getMaxMsgSize()));
             }
         }
     }
 
+    public void updTopicMaxSizeInB(Map<String, Integer> topicMaxSizeInBMap) {
+        this.topicMaxSizeInBMap = topicMaxSizeInBMap;
+    }
+
     public long getConfigId() {
         return configId.get();
     }
 
-    public int getMaxMsgSize() {
-        return maxMsgSize.get();
+    public int getDefMaxMsgSize(String topicName) {
+        Integer maxMsgSizeInB = topicMaxSizeInBMap.get(topicName);
+        return maxMsgSizeInB == null ? defMaxMsgSize.get() : maxMsgSizeInB;
     }
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index afe929f77..6ce1bfa06 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -52,6 +52,7 @@ import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.apache.inlong.tubemq.corerpc.RpcConfig;
 import org.apache.inlong.tubemq.corerpc.RpcConstants;
 import org.apache.inlong.tubemq.corerpc.RpcServiceFactory;
@@ -81,8 +82,8 @@ public class ProducerManager {
     private final ScheduledExecutorService heartbeatService;
     private final AtomicLong visitToken =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
-    private final AllowedSetting allowedSetting =
-            new AllowedSetting();
+    private final MaxMsgSizeHolder msgSizeHolder =
+            new MaxMsgSizeHolder();
     private final AtomicReference<String> authAuthorizedTokenRef =
             new AtomicReference<>("");
     private final ClientAuthenticateHandler authenticateHandler =
@@ -341,10 +342,11 @@ public class ProducerManager {
     /**
      * Get allowed message size.
      *
+     * @param topicName  the topic name
      * @return max allowed message size
      */
-    public int getMaxMsgSize() {
-        return allowedSetting.getMaxMsgSize();
+    public int getMaxMsgSize(String topicName) {
+        return msgSizeHolder.getDefMaxMsgSize(topicName);
     }
 
     /**
@@ -516,27 +518,32 @@ public class ProducerManager {
         return builder.build();
     }
 
-    private void updateTopicPartitions(List<TopicInfo> topicInfoList) {
+    private void updateTopicConfigure(
+            Tuple2<Map<String, Integer>, List<TopicInfo>> topicInfoTuple) {
+        int baseValue;
+        Partition tmpPart;
+        List<Partition> partList;
+        Map<Integer, List<Partition>> brokerPartList;
+        // update topic max msg size
+        msgSizeHolder.updTopicMaxSizeInB(topicInfoTuple.getF0());
+        // update topic Partition info
         Map<String, Map<Integer, List<Partition>>> partitionListMap =
                 new ConcurrentHashMap<>();
-        for (TopicInfo topicInfo : topicInfoList) {
-            Map<Integer, List<Partition>> brokerPartList =
+        for (TopicInfo topicInfo : topicInfoTuple.getF1()) {
+            brokerPartList =
                     partitionListMap.get(topicInfo.getTopic());
             if (brokerPartList == null) {
                 brokerPartList = new ConcurrentHashMap<>();
                 partitionListMap.put(topicInfo.getTopic(), brokerPartList);
             }
             for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
-                int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
+                baseValue = j * TBaseConstants.META_STORE_INS_BASE;
                 for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
-                    Partition part =
-                            new Partition(topicInfo.getBroker(), topicInfo.getTopic(), baseValue + i);
-                    List<Partition> partList = brokerPartList.get(part.getBrokerId());
-                    if (partList == null) {
-                        partList = new ArrayList<>();
-                        brokerPartList.put(part.getBrokerId(), partList);
-                    }
-                    partList.add(part);
+                    tmpPart = new Partition(topicInfo.getBroker(),
+                            topicInfo.getTopic(), baseValue + i);
+                    partList = brokerPartList.computeIfAbsent(
+                            tmpPart.getBrokerId(), k -> new ArrayList<>());
+                    partList.add(tmpPart);
                 }
             }
         }
@@ -595,20 +602,36 @@ public class ProducerManager {
             processAuthorizedToken(response.getAuthorizedInfo());
         }
         if (response.hasAppdConfig()) {
-            procAllowedConfig4P(response.getAppdConfig());
+            msgSizeHolder.updAllowedSetting(response.getAppdConfig());
         }
     }
 
-    private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P response) {
+    private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P response,
+                                          StringBuilder strBuff) {
         if (response.hasRequireAuth()) {
             nextWithAuthInfo2M.set(response.getRequireAuth());
         }
         if (response.hasAppdConfig()) {
-            procAllowedConfig4P(response.getAppdConfig());
+            msgSizeHolder.updAllowedSetting(response.getAppdConfig());
         }
         if (response.hasAuthorizedInfo()) {
             processAuthorizedToken(response.getAuthorizedInfo());
         }
+        if (response.getErrCode() == TErrCodeConstants.NOT_READY) {
+            lastHeartbeatTime = System.currentTimeMillis();
+            return;
+        }
+        if (response.getBrokerCheckSum() != brokerInfoCheckSum) {
+            updateBrokerInfoList(false, response.getBrokerInfosList(),
+                    response.getBrokerCheckSum(), strBuff);
+        }
+        if (response.getTopicInfosList().isEmpty()
+                && System.currentTimeMillis() - lastEmptyTopicPrintTime > 60000) {
+            logger.warn("[Heartbeat Update] found empty topicList update!");
+            lastEmptyTopicPrintTime = System.currentTimeMillis();
+        }
+        updateTopicConfigure(DataConverterUtil
+                .convertTopicInfo(brokersMap, response.getTopicInfosList()));
     }
 
     private void processAuthorizedToken(ClientMaster.MasterAuthorizedInfo inAuthorizedTokenInfo) {
@@ -654,27 +677,20 @@ public class ProducerManager {
     private ClientMaster.ApprovedClientConfig.Builder buildAllowedConfig4P() {
         ClientMaster.ApprovedClientConfig.Builder appdConfig =
                 ClientMaster.ApprovedClientConfig.newBuilder();
-        appdConfig.setConfigId(allowedSetting.getConfigId());
+        appdConfig.setConfigId(msgSizeHolder.getConfigId());
         return appdConfig;
     }
 
-    // set allowed configure info
-    private void procAllowedConfig4P(ClientMaster.ApprovedClientConfig allowedConfig) {
-        if (allowedConfig != null) {
-            allowedSetting.updAllowedSetting(allowedConfig);
-        }
-    }
-
     // #lizard forgives
     private class ProducerHeartbeatTask implements Runnable {
         @Override
         public void run() {
-            StringBuilder sBuilder = new StringBuilder(512);
+            StringBuilder strBuff = new StringBuilder(512);
             while (!heartBeatStatus.compareAndSet(0, 1)) {
                 ThreadUtils.sleep(100);
             }
             // print metrics information
-            clientStatsInfo.selfPrintStatsInfo(false, true, sBuilder);
+            clientStatsInfo.selfPrintStatsInfo(false, true, strBuff);
             // check whether public topics
             if (publishTopics.isEmpty()) {
                 return;
@@ -689,71 +705,49 @@ public class ProducerManager {
                         clientStatsInfo.bookHB2MasterException();
                         logger.error("[Heartbeat Failed] receive null HeartResponseM2P response!");
                     } else {
-                        logger.error(sBuilder.append("[Heartbeat Failed] ")
+                        logger.error(strBuff.append("[Heartbeat Failed] ")
                                 .append(response.getErrMsg()).toString());
-                        sBuilder.delete(0, sBuilder.length());
+                        strBuff.delete(0, strBuff.length());
                         if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
                             clientStatsInfo.bookHB2MasterTimeout();
                             try {
                                 register2Master();
                             } catch (Throwable ee) {
-                                logger.error(sBuilder
+                                logger.error(strBuff
                                     .append("[Heartbeat Failed] re-register failure, error is ")
                                     .append(ee.getMessage()).toString());
-                                sBuilder.delete(0, sBuilder.length());
+                                strBuff.delete(0, strBuff.length());
                             }
                         } else {
                             clientStatsInfo.bookHB2MasterException();
                             if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
-                                adjustHeartBeatPeriod("certificate failure", sBuilder);
+                                adjustHeartBeatPeriod("certificate failure", strBuff);
                             }
                         }
                     }
                     return;
                 }
-                processHeartBeatSyncInfo(response);
-                if (response.getErrCode() == TErrCodeConstants.NOT_READY) {
-                    lastHeartbeatTime = System.currentTimeMillis();
-                    return;
-                }
-                if (response.getBrokerCheckSum() != brokerInfoCheckSum) {
-                    updateBrokerInfoList(false, response.getBrokerInfosList(),
-                            response.getBrokerCheckSum(), sBuilder);
-                }
-                if (response.getTopicInfosList() != null) {
-                    if (response.getTopicInfosList().isEmpty()
-                            && System.currentTimeMillis() - lastEmptyTopicPrintTime > 60000) {
-                        logger.warn("[Heartbeat Update] found empty topicList update!");
-                        lastEmptyTopicPrintTime = System.currentTimeMillis();
-                    }
-                    updateTopicPartitions(DataConverterUtil
-                            .convertTopicInfo(brokersMap, response.getTopicInfosList()));
-                } else {
-                    logger.error(sBuilder
-                            .append("[Heartbeat Failed] Found brokerList or topicList is null, brokerList is ")
-                            .append(response.getBrokerInfosList() != null).toString());
-                    sBuilder.delete(0, sBuilder.length());
-                }
+                processHeartBeatSyncInfo(response, strBuff);
                 heartbeatRetryTimes = 0;
                 long currentTime = System.currentTimeMillis();
                 if ((currentTime - lastHeartbeatTime)
                         > (tubeClientConfig.getHeartbeatPeriodMs() * 4)) {
-                    logger.warn(sBuilder.append(producerId)
+                    logger.warn(strBuff.append(producerId)
                             .append(" heartbeat interval is too long, please check! Total time : ")
                             .append(currentTime - lastHeartbeatTime).toString());
-                    sBuilder.delete(0, sBuilder.length());
+                    strBuff.delete(0, strBuff.length());
                 }
                 lastHeartbeatTime = currentTime;
             } catch (Throwable e) {
-                sBuilder.delete(0, sBuilder.length());
+                strBuff.delete(0, strBuff.length());
                 if (!(e.getCause() != null
                         && e.getCause() instanceof ClientClosedException)) {
                     logger.error("Heartbeat failed,retry later.Reason:{}",
-                            sBuilder.append(e.getClass().getSimpleName())
+                            strBuff.append(e.getClass().getSimpleName())
                                     .append("#").append(e.getMessage()).toString());
-                    sBuilder.delete(0, sBuilder.length());
+                    strBuff.delete(0, strBuff.length());
                 }
-                adjustHeartBeatPeriod("heartbeat exception", sBuilder);
+                adjustHeartBeatPeriod("heartbeat exception", strBuff);
             } finally {
                 heartBeatStatus.compareAndSet(1, 0);
             }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index 291c56e71..e049e9804 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -316,11 +316,11 @@ public class SimpleMessageProducer implements MessageProducer {
         }
         int msgSize = TStringUtils.isBlank(message.getAttribute())
                 ? message.getData().length : (message.getData().length + message.getAttribute().length());
-        if (msgSize > producerManager.getMaxMsgSize()) {
+        if (msgSize > producerManager.getMaxMsgSize(message.getTopic())) {
             throw new TubeClientException(new StringBuilder(512)
                     .append("Illegal parameter: over max message length for the total size of")
                     .append(" message data and attribute, allowed size is ")
-                    .append(producerManager.getMaxMsgSize())
+                    .append(producerManager.getMaxMsgSize(message.getTopic()))
                     .append(", message's real size is ").append(msgSize).toString());
         }
         if (isShutDown.get()) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
index 39b2c5b06..3d63c8e19 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
@@ -96,31 +96,46 @@ public class DataConverterUtil {
     /**
      * convert string info with a brokerInfo list to @link TopicInfo
      *
-     * @param brokerInfoMap
+     * @param brokerInfoMap  broker configure map
      * @param strTopicInfos return a list of TopicInfo
      */
-    public static List<TopicInfo> convertTopicInfo(Map<Integer, BrokerInfo> brokerInfoMap,
-                                                   List<String> strTopicInfos) {
+    public static Tuple2<Map<String, Integer>, List<TopicInfo>> convertTopicInfo(
+            Map<Integer, BrokerInfo> brokerInfoMap, List<String> strTopicInfos) {
         List<TopicInfo> topicList = new ArrayList<>();
-        if (strTopicInfos != null) {
-            for (String info : strTopicInfos) {
-                if (info != null) {
-                    info = info.trim();
-                    String[] strInfo = info.split(TokenConstants.SEGMENT_SEP);
-                    String[] strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
-                    for (String s : strTopicInfoSet) {
-                        String[] strTopicInfo = s.split(TokenConstants.ATTR_SEP);
-                        BrokerInfo brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0]));
-                        if (brokerInfo != null) {
-                            topicList.add(new TopicInfo(brokerInfo,
-                                    strInfo[0], Integer.parseInt(strTopicInfo[1]),
-                                    Integer.parseInt(strTopicInfo[2]), true, true));
-                        }
-                    }
+        Map<String, Integer> topicMaxSizeInBMap = new ConcurrentHashMap<>();
+        if (strTopicInfos == null || strTopicInfos.isEmpty()) {
+            return new Tuple2<>(topicMaxSizeInBMap, topicList);
+        }
+        String[] strInfo;
+        String[] strTopicInfoSet;
+        String[] strTopicInfo;
+        BrokerInfo brokerInfo;
+        for (String info : strTopicInfos) {
+            if (info == null || info.isEmpty()) {
+                continue;
+            }
+            info = info.trim();
+            strInfo = info.split(TokenConstants.SEGMENT_SEP, -1);
+            strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
+            for (String s : strTopicInfoSet) {
+                strTopicInfo = s.split(TokenConstants.ATTR_SEP);
+                brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0]));
+                if (brokerInfo != null) {
+                    topicList.add(new TopicInfo(brokerInfo,
+                            strInfo[0], Integer.parseInt(strTopicInfo[1]),
+                            Integer.parseInt(strTopicInfo[2]), true, true));
                 }
             }
+            if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) {
+                continue;
+            }
+            try {
+                topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2]));
+            } catch (Throwable e) {
+                //
+            }
         }
-        return topicList;
+        return new Tuple2<>(topicMaxSizeInBMap, topicList);
     }
 
     /**
@@ -166,11 +181,8 @@ public class DataConverterUtil {
             }
             String topicName = strInfo[0].trim();
             String[] strCondInfo = strInfo[1].split(TokenConstants.ARRAY_SEP);
-            TreeSet<String> conditionSet = topicConditions.get(topicName);
-            if (conditionSet == null) {
-                conditionSet = new TreeSet<>();
-                topicConditions.put(topicName, conditionSet);
-            }
+            TreeSet<String> conditionSet =
+                    topicConditions.computeIfAbsent(topicName, k -> new TreeSet<>());
             for (String cond : strCondInfo) {
                 if (TStringUtils.isNotBlank(cond)) {
                     conditionSet.add(cond.trim());
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java
index 74e628625..2e857f4b6 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java
@@ -56,6 +56,15 @@ public class Tuple3<T0, T1, T2> {
         return f2;
     }
 
+    public void setF0AndF1(T0 value0, T1 value1) {
+        this.f0 = value0;
+        this.f1 = value1;
+    }
+
+    public void setF2(T2 value2) {
+        this.f2 = value2;
+    }
+
     public void setFieldsValue(T0 value0, T1 value1, T2 value2) {
         this.f0 = value0;
         this.f1 = value1;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java
index deb16d692..35728d5c9 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java
@@ -22,8 +22,8 @@ import java.util.concurrent.atomic.AtomicLong;
 public class RemoteConErrStats {
     private long statisticDuration = 60000;
     private int maxConnAllowedFailCount = 5;
-    private AtomicLong errCounter = new AtomicLong(0);
-    private AtomicLong lastTimeStamp = new AtomicLong(0);
+    private final AtomicLong errCounter = new AtomicLong(0);
+    private final AtomicLong lastTimeStamp = new AtomicLong(0);
 
     public RemoteConErrStats(final long statisticDuration,
                              final int maxConnAllowedFailCount) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java
index 492017e7e..77837a28d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
  * A Future implementation for RPCs.
  */
 public class CallFuture<T> implements Future<T>, Callback<T> {
+    private static final String errMsgInfo = "Wait response message timeout!";
     private final CountDownLatch latch = new CountDownLatch(1);
     private final Callback<T> chainedCallback;
     private T result = null;
@@ -131,7 +132,7 @@ public class CallFuture<T> implements Future<T>, Callback<T> {
             }
             return result;
         } else {
-            throw new TimeoutException();
+            throw new TimeoutException(errMsgInfo);
         }
     }
 
@@ -155,7 +156,7 @@ public class CallFuture<T> implements Future<T>, Callback<T> {
     public void await(long timeout, TimeUnit unit)
             throws InterruptedException, TimeoutException {
         if (!latch.await(timeout, unit)) {
-            throw new TimeoutException();
+            throw new TimeoutException(errMsgInfo);
         }
     }
 
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
index bc5e26d93..f7ef9576e 100644
--- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
@@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo;
 import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
 import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.junit.Test;
 
 public class DataConverterUtilTest {
@@ -65,8 +66,9 @@ public class DataConverterUtilTest {
         strInfoList.clear();
         // topic#brokerId:partitionNum:topicStoreNum
         strInfoList.add("tube#0:10:5");
-        List<TopicInfo> topicList = DataConverterUtil.convertTopicInfo(brokerMap, strInfoList);
-        assertEquals("topic should be equal", topic, topicList.get(0));
+        Tuple2<Map<String, Integer>, List<TopicInfo>> topicConfTuple =
+                DataConverterUtil.convertTopicInfo(brokerMap, strInfoList);
+        assertEquals("topic should be equal", topic, topicConfTuple.getF1().get(0));
 
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index c18c54b22..2b802de98 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -82,6 +82,7 @@ import org.apache.inlong.tubemq.corebase.utils.OpsSyncInfo;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
 import org.apache.inlong.tubemq.corerpc.RpcConfig;
 import org.apache.inlong.tubemq.corerpc.RpcConstants;
 import org.apache.inlong.tubemq.corerpc.RpcServiceFactory;
@@ -352,13 +353,17 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         heartbeatManager.regProducerNode(producerId);
         producerHolder.setProducerInfo(producerId,
                 new HashSet<>(transTopicSet), hostName, overtls);
+        // get current configure information
         Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
                 brokerRunManager.getBrokerStaticInfo(overtls);
+        Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple =
+                getTopicConfigureInfos(producerId, true);
         builder.setBrokerCheckSum(brokerStaticInfo.getF0());
         builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
-        builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
+        builder.setAuthorizedInfo(genAuthorizedInfo(
+                certResult.authorizedToken, false).build());
         ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
-                buildApprovedClientConfig(request.getAppdConfig());
+                buildApprovedClientConfig(request.getAppdConfig(), prodTopicConfigTuple);
         if (clientConfigBuilder != null) {
             builder.setAppdConfig(clientConfigBuilder);
         }
@@ -443,24 +448,30 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         topicPSInfoManager.addProducerTopicPubInfo(producerId, transTopicSet);
         producerHolder.updateProducerInfo(producerId,
                 transTopicSet, hostName, overtls);
-        Map<String, String> availTopicPartitions = getProducerTopicPartitionInfo(producerId);
-        builder.addAllTopicInfos(availTopicPartitions.values());
-        builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
+        // get current configure information and set
         Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
                 brokerRunManager.getBrokerStaticInfo(overtls);
+        final Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple =
+                getTopicConfigureInfos(producerId, false);
+        builder.setAuthorizedInfo(genAuthorizedInfo(
+                certResult.authorizedToken, false).build());
         builder.setBrokerCheckSum(brokerStaticInfo.getF0());
         if (brokerStaticInfo.getF0() != inBrokerCheckSum) {
             builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
         }
+        if (prodTopicConfigTuple.getF2() != null) {
+            builder.addAllTopicInfos(prodTopicConfigTuple.getF2().values());
+        }
         ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
-                buildApprovedClientConfig(request.getAppdConfig());
+                buildApprovedClientConfig(request.getAppdConfig(), prodTopicConfigTuple);
         if (clientConfigBuilder != null) {
             builder.setAppdConfig(clientConfigBuilder);
         }
         if (logger.isDebugEnabled()) {
             logger.debug(strBuffer.append("[Push Producer's available topic count:]")
                     .append(producerId).append(TokenConstants.LOG_SEG_SEP)
-                    .append(availTopicPartitions.size()).toString());
+                    .append((prodTopicConfigTuple.getF2() == null)
+                            ? 0 : prodTopicConfigTuple.getF2().size()).toString());
         }
         builder.setSuccess(true);
         builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -564,6 +575,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             return builder.build();
         }
         final Set<String> reqTopicSet = (Set<String>) result.getRetData();
+        final Map<String, TreeSet<String>> reqTopicConditions =
+                DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
         String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : "";
         ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound())
                 ? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL;
@@ -576,8 +589,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             return builder.build();
         }
         Map<String, Long> requiredPartMap = (Map<String, Long>) paramCheckResult.checkData;
-        Map<String, TreeSet<String>> reqTopicConditions =
-                DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
         String sessionKey = request.hasSessionKey() ? request.getSessionKey() : "";
         long sessionTime = request.hasSessionTime()
                 ? request.getSessionTime() : System.currentTimeMillis();
@@ -642,20 +653,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             consumeGroupInfo = (ConsumeGroupInfo) paramCheckResult.checkData;
             topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
             if (CollectionUtils.isNotEmpty(subscribeList)) {
-                Map<String, Map<String, Partition>> topicPartSubMap =
-                        new HashMap<>();
+                int reportCnt = 0;
+                Map<String, Partition> partMap;
+                Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
                 currentSubInfo.put(consumerId, topicPartSubMap);
+                strBuffer.append("[SubInfo Report] client=").append(consumerId)
+                        .append(", subscribed partitions=[");
                 for (SubscribeInfo info : subscribeList) {
-                    Map<String, Partition> partMap = topicPartSubMap.get(info.getTopic());
-                    if (partMap == null) {
-                        partMap = new HashMap<>();
-                        topicPartSubMap.put(info.getTopic(), partMap);
-                    }
+                    partMap = topicPartSubMap.computeIfAbsent(
+                            info.getTopic(), k -> new HashMap<>());
                     partMap.put(info.getPartition().getPartitionKey(), info.getPartition());
-                    logger.info(strBuffer.append("[SubInfo Report]")
-                            .append(info.toString()).toString());
-                    strBuffer.delete(0, strBuffer.length());
+                    if (reportCnt++ > 0) {
+                        strBuffer.append(",");
+                    }
+                    strBuffer.append(info.getPartitionStr());
                 }
+                strBuffer.append("]");
+                logger.info(strBuffer.toString());
+                strBuffer.delete(0, strBuffer.length());
             }
             heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
         } catch (IOException e) {
@@ -1601,19 +1616,29 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * @param producerId
      * @return
      */
-    private Map<String, String> getProducerTopicPartitionInfo(String producerId) {
-        ProducerInfo producerInfo =
-                producerHolder.getProducerInfo(producerId);
+    private Tuple3<Long, Integer, Map<String, String>> getTopicConfigureInfos(String producerId,
+                                                                              boolean onlyMsgSize) {
+        Tuple3<Long, Integer, Map<String, String>> result = new Tuple3<>();
+        ClusterSettingEntity clusterEntity =
+                defMetaDataService.getClusterDefSetting(false);
+        result.setF0AndF1(clusterEntity.getSerialId(),
+                clusterEntity.getMaxMsgSizeInB());
+        if (onlyMsgSize) {
+            return result;
+        }
+        ProducerInfo producerInfo = producerHolder.getProducerInfo(producerId);
         if (producerInfo == null) {
-            return new HashMap<>();
+            return result;
         }
-        Set<String> producerInfoTopicSet =
-                producerInfo.getTopicSet();
-        if ((producerInfoTopicSet == null)
-                || (producerInfoTopicSet.isEmpty())) {
-            return new HashMap<>();
+        Set<String> publishedTopicSet = producerInfo.getTopicSet();
+        if ((publishedTopicSet == null)
+                || (publishedTopicSet.isEmpty())) {
+            return result;
         }
-        return brokerRunManager.getPubBrokerAcceptPubPartInfo(producerInfoTopicSet);
+        Map<String, Integer> topicAndSizeMap =
+                defMetaDataService.getMaxMsgSizeInBByTopics(result.getF1(), publishedTopicSet);
+        result.setF2(brokerRunManager.getPubBrokerAcceptPubPartInfo(topicAndSizeMap));
+        return result;
     }
 
     @Override
@@ -1642,12 +1667,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         final StringBuilder strBuffer = new StringBuilder(512);
         final long balanceId = idGenerator.incrementAndGet();
         if (defMetaDataService != null) {
-            logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
+            logger.info(strBuffer.append("[Balance Status] ").append(balanceId)
                     .append(", isMaster=").append(defMetaDataService.isSelfMaster())
                     .append(", isPrimaryNodeActive=")
                     .append(defMetaDataService.isPrimaryNodeActive()).toString());
         } else {
-            logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
+            logger.info(strBuffer.append("[Balance Status] ").append(balanceId)
                     .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
         }
         strBuffer.delete(0, strBuffer.length());
@@ -1655,7 +1680,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         processClientBalanceMetaInfo(balanceId, strBuffer);
         // process server-balance
         processServerBalance(tMaster, balanceId, strBuffer);
-        logger.info(strBuffer.append("[Balance End] ").append(balanceId).toString());
     }
 
     private void processServerBalance(TMaster tMaster,
@@ -1663,7 +1687,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                                       StringBuilder sBuffer) {
         int curDoingTasks = this.curSvrBalanceParal.get();
         if (curDoingTasks > 0) {
-            logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId)
+            logger.info(sBuffer.append("[Svr-Balance Status] ").append(balanceId)
                     .append(" the Server-Balance has ").append(curDoingTasks)
                     .append(" task(s) in progress!").toString());
             sBuffer.delete(0, sBuffer.length());
@@ -1704,13 +1728,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                             if (subGroups.isEmpty()) {
                                 return;
                             }
+                            final StringBuilder strBuffer = new StringBuilder(512);
                             // first process reset rebalance task;
                             try {
                                 tMaster.processResetbalance(balanceId,
-                                        isStartBalance, subGroups);
+                                        isStartBalance, subGroups, strBuffer);
                             } catch (Throwable e) {
                                 logger.warn(new StringBuilder(1024)
-                                        .append("[Svr-Balance processor] Error during reset-reb,")
+                                        .append("[Svr-Balance Status] Error during reset-reb,")
                                         .append("the groups that may be affected are ")
                                         .append(subGroups).append(",error is ")
                                         .append(e).toString());
@@ -1721,16 +1746,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                             // second process normal balance task;
                             try {
                                 tMaster.processRebalance(balanceId,
-                                        isStartBalance, subGroups);
+                                        isStartBalance, subGroups, strBuffer);
                             } catch (Throwable e) {
                                 logger.warn(new StringBuilder(1024)
-                                        .append("[Svr-Balance processor] Error during normal-reb,")
+                                        .append("[Svr-Balance Status] Error during normal-reb,")
                                         .append("the groups that may be affected are ")
                                         .append(subGroups).append(",error is ")
                                         .append(e).toString());
                             }
                         } catch (Throwable e) {
-                            logger.warn("[Svr-Balance processor] Error during process", e);
+                            logger.warn("[Svr-Balance Status] Error during process", e);
                         } finally {
                             if (curSvrBalanceParal.decrementAndGet() == 0) {
                                 MasterSrvStatsHolder.updSvrBalanceDurations(
@@ -1742,14 +1767,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
         }
         startupBalance = false;
-        logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId).toString());
-        sBuffer.delete(0, sBuffer.length());
     }
 
     private void processClientBalanceMetaInfo(long balanceId, StringBuilder sBuffer) {
         int curDoingTasks = this.curCltBalanceParal.get();
         if (curDoingTasks > 0) {
-            logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId)
+            logger.info(sBuffer.append("[Clt-Balance Status] ").append(balanceId)
                     .append(" the Client-Balance has ").append(curDoingTasks)
                     .append(" task(s) in progress!").toString());
             sBuffer.delete(0, sBuffer.length());
@@ -1794,7 +1817,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                                 freshTopicMetaInfo(consumeGroupInfo, sBuffer2);
                             }
                         } catch (Throwable e) {
-                            logger.warn("[Clt-Balance processor] Error during process", e);
+                            logger.warn("[Clt-Balance Status] Error during process", e);
                         } finally {
                             curCltBalanceParal.decrementAndGet();
                         }
@@ -1802,8 +1825,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 });
             }
         }
-        logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId).toString());
-        sBuffer.delete(0, sBuffer.length());
     }
 
     public void freshTopicMetaInfo(ConsumeGroupInfo consumeGroupInfo, StringBuilder sBuffer) {
@@ -1826,7 +1847,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         int count = 0;
         int statusId = 0;
-        TopicInfo topicInfo;
+        Tuple2<Boolean, TopicInfo> topicSubInfo = new Tuple2<>();
         Set<String> fbdTopicSet =
                 defMetaDataService.getDisableTopicByGroupName(groupInfo.getGroupName());
         for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
@@ -1834,14 +1855,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 continue;
             }
             count = 0;
-            statusId = 0;
             for (TopicDeployEntity deployInfo : entry.getValue()) {
-                topicInfo =
-                        brokerRunManager.getPubBrokerTopicInfo(
-                                deployInfo.getBrokerId(), deployInfo.getTopicName());
-                if (topicInfo != null
-                        && topicInfo.isAcceptSubscribe()
-                        && !fbdTopicSet.contains(topicInfo.getTopic())) {
+                statusId = 0;
+                brokerRunManager.getSubBrokerTopicInfo(deployInfo.getBrokerId(),
+                        deployInfo.getTopicName(), topicSubInfo);
+                if (topicSubInfo.getF0()
+                        && topicSubInfo.getF1() != null
+                        && topicSubInfo.getF1().isAcceptSubscribe()
+                        && !fbdTopicSet.contains(deployInfo.getTopicName())) {
                     statusId = 1;
                 }
                 if (count++ == 0) {
@@ -1868,11 +1889,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * @param rebalanceId   the re-balance id
      * @param isFirstReb    whether is first re-balance
      * @param groups        the need re-balance group set
+     * @param strBuffer     string buffer
      */
-    public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
+    public void processRebalance(long rebalanceId, boolean isFirstReb,
+                                 List<String> groups, StringBuilder strBuffer) {
         // #lizard forgives
-        Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
-        final StringBuilder strBuffer = new StringBuilder(512);
+        Map<String, Map<String, List<Partition>>> finalSubInfoMap;
         // choose different load balance strategy
         if (isFirstReb) {
             finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
@@ -1881,122 +1903,91 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
                     consumerHolder, brokerRunManager, groups, defMetaDataService, strBuffer);
         }
+        boolean included;
+        String consumerId;
+        boolean isDelEmpty;
+        boolean isAddEmtpy;
+        ConsumerInfo consumerInfo;
+        Set<String> blackTopicSet;
+        List<SubscribeInfo> deletedSubInfoList;
+        List<SubscribeInfo> addedSubInfoList;
+        Map<String, Partition> currentPartMap;
+        Map<String, Map<String, Partition>> curTopicSubInfoMap;
         // allocate partitions to consumers
         for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
             if (entry == null) {
                 continue;
             }
-            String consumerId = entry.getKey();
+            consumerId = entry.getKey();
             if (consumerId == null) {
                 continue;
             }
-            ConsumerInfo consumerInfo =
-                    consumerHolder.getConsumerInfo(consumerId);
+            consumerInfo = consumerHolder.getConsumerInfo(consumerId);
             if (consumerInfo == null) {
                 continue;
             }
-            Set<String> blackTopicSet =
+            addedSubInfoList = new ArrayList<>();
+            deletedSubInfoList = new ArrayList<>();
+            blackTopicSet =
                     defMetaDataService.getDisableTopicByGroupName(consumerInfo.getGroupName());
-            Map<String, List<Partition>> topicSubPartMap = entry.getValue();
-            List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
-            List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
-            for (Map.Entry<String, List<Partition>> topicEntry : topicSubPartMap.entrySet()) {
+            for (Map.Entry<String, List<Partition>> topicEntry : entry.getValue().entrySet()) {
                 if (topicEntry == null) {
                     continue;
                 }
-                String topic = topicEntry.getKey();
-                List<Partition> finalPartList = topicEntry.getValue();
-                Map<String, Partition> currentPartMap = null;
-                Map<String, Map<String, Partition>> curTopicSubInfoMap =
-                        currentSubInfo.get(consumerId);
-                if (curTopicSubInfoMap == null || curTopicSubInfoMap.get(topic) == null) {
+                curTopicSubInfoMap = currentSubInfo.get(consumerId);
+                if (curTopicSubInfoMap == null
+                        || curTopicSubInfoMap.get(topicEntry.getKey()) == null) {
                     currentPartMap = new HashMap<>();
                 } else {
-                    currentPartMap = curTopicSubInfoMap.get(topic);
+                    currentPartMap = curTopicSubInfoMap.get(topicEntry.getKey());
                     if (currentPartMap == null) {
                         currentPartMap = new HashMap<>();
                     }
                 }
-                if (consumerInfo.isOverTLS()) {
-                    for (Partition currentPart : currentPartMap.values()) {
-                        if (!blackTopicSet.contains(currentPart.getTopic())) {
-                            boolean found = false;
-                            for (Partition newPart : finalPartList) {
-                                if (newPart.getPartitionFullStr(true)
-                                        .equals(currentPart.getPartitionFullStr(true))) {
-                                    found = true;
-                                    break;
-                                }
-                            }
-                            if (found) {
+                for (Partition currentPart : currentPartMap.values()) {
+                    included = false;
+                    if (!blackTopicSet.contains(currentPart.getTopic())) {
+                        for (Partition newPart : topicEntry.getValue()) {
+                            if (newPart == null) {
                                 continue;
                             }
-                        }
-                        deletedSubInfoList
-                                .add(new SubscribeInfo(consumerId, consumerInfo.getGroupName(),
-                                        consumerInfo.isOverTLS(), currentPart));
-                    }
-                    for (Partition finalPart : finalPartList) {
-                        if (!blackTopicSet.contains(finalPart.getTopic())) {
-                            boolean found = false;
-                            for (Partition curPart : currentPartMap.values()) {
-                                if (finalPart.getPartitionFullStr(true)
-                                        .equals(curPart.getPartitionFullStr(true))) {
-                                    found = true;
-                                    break;
-                                }
-                            }
-                            if (found) {
-                                continue;
+                            if (newPart.getPartitionKey().equals(
+                                    currentPart.getPartitionKey())) {
+                                included = true;
+                                break;
                             }
-                            addedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    consumerInfo.getGroupName(), true, finalPart));
                         }
                     }
-                } else {
-                    for (Partition currentPart : currentPartMap.values()) {
-                        if ((blackTopicSet.contains(currentPart.getTopic()))
-                                || (!finalPartList.contains(currentPart))) {
-                            deletedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    consumerInfo.getGroupName(), false, currentPart));
-                        }
+                    if (!included) {
+                        deletedSubInfoList.add(new SubscribeInfo(consumerId,
+                                consumerInfo.getGroupName(), consumerInfo.isOverTLS(), currentPart));
                     }
-                    for (Partition finalPart : finalPartList) {
-                        if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
-                                && (!blackTopicSet.contains(finalPart.getTopic()))) {
-                            addedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    consumerInfo.getGroupName(), false, finalPart));
-                        }
+                }
+                for (Partition finalPart : topicEntry.getValue()) {
+                    if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
+                            && (!blackTopicSet.contains(finalPart.getTopic()))) {
+                        addedSubInfoList.add(new SubscribeInfo(consumerId,
+                                consumerInfo.getGroupName(), consumerInfo.isOverTLS(), finalPart));
                     }
                 }
             }
-            boolean isDelEmpty = deletedSubInfoList.isEmpty();
-            boolean isAddEmtpy = addedSubInfoList.isEmpty();
+            isDelEmpty = deletedSubInfoList.isEmpty();
+            isAddEmtpy = addedSubInfoList.isEmpty();
             if (!isDelEmpty) {
-                EventType opType =
-                        (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT;
-                consumerEventManager
-                        .addDisconnectEvent(consumerId,
-                                new ConsumerEvent(rebalanceId, opType,
-                                        deletedSubInfoList, EventStatus.TODO));
-                for (SubscribeInfo info : deletedSubInfoList) {
-                    logger.info(strBuffer.append("[Disconnect]")
-                            .append(info.toString()).toString());
-                    strBuffer.delete(0, strBuffer.length());
-                }
+                consumerEventManager.addDisconnectEvent(consumerId,
+                        new ConsumerEvent(rebalanceId,
+                                (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT,
+                                deletedSubInfoList, EventStatus.TODO));
+                printTODOContent(rebalanceId, consumerId,
+                        "Disconnect", deletedSubInfoList, strBuffer);
             }
             if (!isAddEmtpy) {
-                EventType opType =
-                        (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT;
-                consumerEventManager
-                        .addConnectEvent(consumerId,
-                                new ConsumerEvent(rebalanceId, opType,
-                                        addedSubInfoList, EventStatus.TODO));
-                for (SubscribeInfo info : addedSubInfoList) {
-                    logger.info(strBuffer.append("[Connect]")
-                            .append(info.toString()).toString());
-                    strBuffer.delete(0, strBuffer.length());
-                }
+                consumerEventManager.addConnectEvent(consumerId,
+                        new ConsumerEvent(rebalanceId,
+                                (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT,
+                                addedSubInfoList, EventStatus.TODO));
+                printTODOContent(rebalanceId, consumerId,
+                        "Connect", addedSubInfoList, strBuffer);
             }
         }
     }
@@ -2004,10 +1995,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     /**
      * process Reset balance
      */
-    public void processResetbalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
+    public void processResetbalance(long rebalanceId, boolean isFirstReb,
+                                    List<String> groups, StringBuilder strBuffer) {
         // #lizard forgives
-        final StringBuilder strBuffer = new StringBuilder(512);
-        Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap = null;
+        Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap;
         // choose different load balance strategy
         if (isFirstReb) {
             finalSubInfoMap =  this.loadBalancer.resetBukAssign(consumerHolder,
@@ -2016,46 +2007,51 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo,
                     consumerHolder, brokerRunManager, groups, this.defMetaDataService, strBuffer);
         }
+        String consumerId;
+        boolean isAddEmtpy;
+        boolean isDelEmpty;
+        ConsumerInfo consumerInfo;
+        Set<String> blackTopicSet;
+        List<SubscribeInfo> addedSubInfoList;
+        List<SubscribeInfo> deletedSubInfoList;
+        Map<String, Partition> finalPartMap;
+        Map<String, Partition> currentPartMap;
+        Map<String, Map<String, Partition>> curTopicSubInfoMap;
         // filter
         for (Map.Entry<String, Map<String, Map<String, Partition>>> entry
                 : finalSubInfoMap.entrySet()) {
             if (entry == null) {
                 continue;
             }
-            String consumerId = entry.getKey();
+            consumerId = entry.getKey();
             if (consumerId == null) {
                 continue;
             }
-            ConsumerInfo consumerInfo =
-                    consumerHolder.getConsumerInfo(consumerId);
+            consumerInfo = consumerHolder.getConsumerInfo(consumerId);
             if (consumerInfo == null) {
                 continue;
             }
             // allocate partitions to consumers
-            Set<String> blackTopicSet =
+            addedSubInfoList = new ArrayList<>();
+            deletedSubInfoList = new ArrayList<>();
+            blackTopicSet =
                     defMetaDataService.getDisableTopicByGroupName(consumerInfo.getGroupName());
-            Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
-            List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
-            List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
-            for (Map.Entry<String, Map<String, Partition>> topicEntry : topicSubPartMap.entrySet()) {
+            for (Map.Entry<String, Map<String, Partition>> topicEntry : entry.getValue().entrySet()) {
                 if (topicEntry == null) {
                     continue;
                 }
-                String topic = topicEntry.getKey();
-                Map<String, Partition> finalPartMap = topicEntry.getValue();
-                Map<String, Partition> currentPartMap = null;
-                Map<String, Map<String, Partition>> curTopicSubInfoMap =
-                        currentSubInfo.get(consumerId);
+                curTopicSubInfoMap = currentSubInfo.get(consumerId);
                 if (curTopicSubInfoMap == null
-                        || curTopicSubInfoMap.get(topic) == null) {
+                        || curTopicSubInfoMap.get(topicEntry.getKey()) == null) {
                     currentPartMap = new HashMap<>();
                 } else {
-                    currentPartMap = curTopicSubInfoMap.get(topic);
+                    currentPartMap = curTopicSubInfoMap.get(topicEntry.getKey());
                     if (currentPartMap == null) {
                         currentPartMap = new HashMap<>();
                     }
                 }
                 // filter
+                finalPartMap = topicEntry.getValue();
                 for (Partition currentPart : currentPartMap.values()) {
                     if ((blackTopicSet.contains(currentPart.getTopic()))
                             || (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
@@ -2073,35 +2069,55 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 }
             }
             // generate consumer event
-            boolean isDelEmpty = deletedSubInfoList.isEmpty();
-            boolean isAddEmtpy = addedSubInfoList.isEmpty();
+            isDelEmpty = deletedSubInfoList.isEmpty();
+            isAddEmtpy = addedSubInfoList.isEmpty();
             if (!isDelEmpty) {
-                EventType opType =
-                        (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT;
                 consumerEventManager.addDisconnectEvent(consumerId,
-                        new ConsumerEvent(rebalanceId, opType,
+                        new ConsumerEvent(rebalanceId,
+                                (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT,
                                 deletedSubInfoList, EventStatus.TODO));
-                for (SubscribeInfo info : deletedSubInfoList) {
-                    logger.info(strBuffer.append("[ResetDisconnect]")
-                            .append(info.toString()).toString());
-                    strBuffer.delete(0, strBuffer.length());
-                }
+                printTODOContent(rebalanceId, consumerId,
+                        "ResetDisconnect", deletedSubInfoList, strBuffer);
             }
             if (!isAddEmtpy) {
-                EventType opType =
-                        (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT;
                 consumerEventManager.addConnectEvent(consumerId,
-                        new ConsumerEvent(rebalanceId, opType,
+                        new ConsumerEvent(rebalanceId,
+                                (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT,
                                 addedSubInfoList, EventStatus.TODO));
-                for (SubscribeInfo info : addedSubInfoList) {
-                    logger.info(strBuffer.append("[ResetConnect]")
-                            .append(info.toString()).toString());
-                    strBuffer.delete(0, strBuffer.length());
-                }
+                printTODOContent(rebalanceId, consumerId,
+                        "ResetConnect", addedSubInfoList, strBuffer);
             }
         }
     }
 
+    /**
+     * Print the TODO subscribe info
+     *
+     * @param rebalanceId    the rebalance id
+     * @param consumerId     the consumer id
+     * @param opType         the operation type
+     * @param subInfoList    the subscribe set
+     * @param strBuffer      the string buffer
+     */
+    private void printTODOContent(long rebalanceId, String consumerId, String opType,
+                                  List<SubscribeInfo> subInfoList, StringBuilder strBuffer) {
+        int recordCnt = 0;
+        strBuffer.append("[").append(opType).append("] TODO, rebalanceId=").append(rebalanceId)
+                .append(", client=").append(consumerId).append(", partitions=[");
+        for (SubscribeInfo info : subInfoList) {
+            if (info == null) {
+                continue;
+            }
+            if (recordCnt++ > 0) {
+                strBuffer.append(",");
+            }
+            strBuffer.append(info.getPartitionStr());
+        }
+        strBuffer.append("]");
+        logger.info(strBuffer.toString());
+        strBuffer.delete(0, strBuffer.length());
+    }
+
     /**
      * check if master subscribe info consist consumer subscribe info
      *
@@ -2177,6 +2193,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         List<String> groupsNeedToBalance = new ArrayList<>();
         Set<String> groupHasUnfinishedEvent = new HashSet<>();
         if (consumerEventManager.hasEvent()) {
+            String group;
             Set<String> consumerIdSet =
                     consumerEventManager.getUnProcessedIdSet();
             Map<String, TimeoutInfo> heartbeatMap =
@@ -2185,7 +2202,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 if (consumerId == null) {
                     continue;
                 }
-                String group = consumerHolder.getGroupName(consumerId);
+                group = consumerHolder.getGroupName(consumerId);
                 if (group == null) {
                     continue;
                 }
@@ -2238,23 +2255,20 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * build approved client configure
      *
      * @param inClientConfig client reported Configure info
+     * @param prodConfigTuple     published max topic size tuple
      * @return ApprovedClientConfig
      */
     private ClientMaster.ApprovedClientConfig.Builder buildApprovedClientConfig(
-            ClientMaster.ApprovedClientConfig inClientConfig) {
+            ClientMaster.ApprovedClientConfig inClientConfig,
+            Tuple3<Long, Integer, Map<String, String>> prodConfigTuple) {
         ClientMaster.ApprovedClientConfig.Builder outClientConfig = null;
-        if (inClientConfig != null) {
-            outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder();
-            ClusterSettingEntity settingEntity =
-                    this.defMetaDataService.getClusterDefSetting(false);
-            if (settingEntity == null) {
-                outClientConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED);
-            } else {
-                outClientConfig.setConfigId(settingEntity.getSerialId());
-                if (settingEntity.getSerialId() != inClientConfig.getConfigId()) {
-                    outClientConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB());
-                }
-            }
+        if (inClientConfig == null) {
+            return outClientConfig;
+        }
+        outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder();
+        outClientConfig.setConfigId(prodConfigTuple.getF0());
+        if (inClientConfig.getConfigId() != prodConfigTuple.getF0()) {
+            outClientConfig.setMaxMsgSize(prodConfigTuple.getF1());
         }
         return outClientConfig;
     }
@@ -2407,7 +2421,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     .append(consumerId).append(" report the info: [");
             printHeader = false;
         }
-        sBuffer.append(type).append(info.toString()).append(", ");
+        sBuffer.append(type).append(info).append(", ");
         return printHeader;
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 8b066bab9..7a90fab08 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -165,8 +165,8 @@ public class BrokerAbnHolder {
         if (brokerFbdInfo == null) {
             return retTuple;
         }
-        retTuple.setF0AndF1(brokerFbdInfo.newStatus.isAcceptPublish(),
-                brokerFbdInfo.newStatus.isAcceptSubscribe());
+        retTuple.setF0AndF1(!brokerFbdInfo.newStatus.isAcceptPublish(),
+                !brokerFbdInfo.newStatus.isAcceptSubscribe());
         return retTuple;
     }
 
@@ -191,7 +191,6 @@ public class BrokerAbnHolder {
         brokerForbiddenCount.set(0);
         brokerAbnormalMap.clear();
         brokerForbiddenMap.clear();
-
     }
 
     public int getCurrentBrokerCount() {
@@ -355,5 +354,4 @@ public class BrokerAbnHolder {
                 .toString();
         }
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
index d3cf14c48..d8433c84f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
@@ -166,13 +166,26 @@ public class BrokerPSInfoHolder {
         return subTopicInfoView.getAcceptSubParts(topic, enableSubBrokerIdSet);
     }
 
+    /**
+     * Get the subscribed TopicInfo information of topic in broker
+     *
+     * @param brokerId need query broker
+     * @param topic    need query topic
+     * @param result   query result(broker accept subscribe, null or topicInfo configure)
+     */
+    public void getBrokerSubPushedTopicInfo(int brokerId, String topic,
+                                            Tuple2<Boolean, TopicInfo> result) {
+        result.setF0AndF1(enableSubBrokerIdSet.contains(brokerId),
+                subTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic));
+    }
+
     /**
      * Gets the string map of topic partitions whose publish status is enabled
      *
-     * @param topicSet need query topic set
+     * @param topicSizeMap need query topic set and topic's max message size
      */
-    public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet) {
-        return pubTopicInfoView.getAcceptPubPartInfo(topicSet, enablePubBrokerIdSet);
+    public Map<String, String> getAcceptPubPartInfo(Map<String, Integer> topicSizeMap) {
+        return pubTopicInfoView.getAcceptPubPartInfo(topicSizeMap, enablePubBrokerIdSet);
     }
 
     /**
@@ -180,11 +193,14 @@ public class BrokerPSInfoHolder {
      *
      * @param brokerId need query broker
      * @param topic    need query topic
-     *
-     * @return null or topicInfo configure
+     * @param result   query result(broker accept publish,
+     *                       broker accept subscribe, null or topicInfo configure)
      */
-    public TopicInfo getBrokerPubPushedTopicInfo(int brokerId, String topic) {
-        return pubTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic);
+    public void getBrokerPubPushedTopicInfo(int brokerId, String topic,
+                                            Tuple3<Boolean, Boolean, TopicInfo> result) {
+        result.setFieldsValue(enablePubBrokerIdSet.contains(brokerId),
+                enableSubBrokerIdSet.contains(brokerId),
+                pubTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic));
     }
 
     /**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
index 73d8e5a1d..1d768a5d3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
@@ -92,7 +92,7 @@ public interface BrokerRunManager {
 
     BrokerAbnHolder getBrokerAbnHolder();
 
-    Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet);
+    Map<String, String> getPubBrokerAcceptPubPartInfo(Map<String, Integer> topicSizeMap);
 
     int getSubTopicMaxBrokerCount(Set<String> topicSet);
 
@@ -100,7 +100,10 @@ public interface BrokerRunManager {
 
     List<Partition> getSubBrokerAcceptSubParts(String topic);
 
-    TopicInfo getPubBrokerTopicInfo(int brokerId, String topic);
+    void getSubBrokerTopicInfo(int brokerId, String topic, Tuple2<Boolean, TopicInfo> result);
+
+    void getPubBrokerTopicInfo(int brokerId, String topic,
+                               Tuple3<Boolean, Boolean, TopicInfo> result);
 
     void getPubBrokerPushedTopicInfo(int brokerId,
                                      Tuple3<Boolean, Boolean, List<TopicInfo>> result);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
index 77cd64faf..a9a50b764 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
@@ -338,5 +338,4 @@ public class BrokerSyncData {
         }
         return CheckSum.crc32(buffer.array());
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
index 6ac080b97..b63251867 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
@@ -195,23 +195,23 @@ public class BrokerTopicInfoView {
     /**
      * Gets the string map of topic partitions whose publish status is enabled
      *
-     * @param topicSet need query topic set
+     * @param topicSizeMap need query topic and maxsize map
      * @param enablePubBrokerIdSet  need filtered broker id set
      * @return  query result
      */
-    public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet,
+    public Map<String, String> getAcceptPubPartInfo(Map<String, Integer> topicSizeMap,
                                                     Set<Integer> enablePubBrokerIdSet) {
         TopicInfo topicInfo;
         ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
         Map<String, String> topicPartStrMap = new HashMap<>();
         Map<String, StringBuilder> topicPartBufferMap = new HashMap<>();
-        if (topicSet == null || topicSet.isEmpty()) {
+        if (topicSizeMap == null || topicSizeMap.isEmpty()) {
             return topicPartStrMap;
         }
-        for (String topic : topicSet) {
-            if (topic == null) {
-                continue;
-            }
+        // build topic-partition information
+        StringBuilder tmpValue;
+        StringBuilder confValue;
+        for (String topic : topicSizeMap.keySet()) {
             topicInfoView = topicConfInfoMap.get(topic);
             if (topicInfoView == null
                     || topicInfoView.isEmpty()) {
@@ -225,23 +225,33 @@ public class BrokerTopicInfoView {
                 }
                 topicInfo = entry.getValue();
                 if (topicInfo.isAcceptPublish()) {
-                    StringBuilder tmpValue = topicPartBufferMap.get(topic);
-                    if (tmpValue == null) {
-                        StringBuilder strBuffer =
-                                new StringBuilder(512).append(topic)
-                                        .append(TokenConstants.SEGMENT_SEP)
-                                        .append(topicInfo.getSimpleValue());
-                        topicPartBufferMap.put(topic, strBuffer);
+                    confValue = topicPartBufferMap.get(topic);
+                    if (confValue == null) {
+                        tmpValue = new StringBuilder(512).append(topic)
+                                .append(TokenConstants.SEGMENT_SEP)
+                                .append(topicInfo.getSimpleValue());
+                        topicPartBufferMap.put(topic, tmpValue);
                     } else {
-                        tmpValue.append(TokenConstants.ARRAY_SEP)
+                        confValue.append(TokenConstants.ARRAY_SEP)
                                 .append(topicInfo.getSimpleValue());
                     }
                 }
             }
         }
+        // append max message size information
+        Integer maxMsgSize;
         for (Map.Entry<String, StringBuilder> entry : topicPartBufferMap.entrySet()) {
-            if (entry.getValue() != null) {
-                topicPartStrMap.put(entry.getKey(), entry.getValue().toString());
+            if (entry.getValue() == null) {
+                continue;
+            }
+            confValue = topicPartBufferMap.get(entry.getKey());
+            maxMsgSize = topicSizeMap.get(entry.getKey());
+            if (maxMsgSize == null) {
+                topicPartStrMap.put(entry.getKey(),
+                        confValue.append(TokenConstants.SEGMENT_SEP).toString());
+            } else {
+                topicPartStrMap.put(entry.getKey(),
+                        confValue.append(TokenConstants.SEGMENT_SEP).append(maxMsgSize).toString());
             }
         }
         topicPartBufferMap.clear();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index c274f67a5..26678c2b1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -509,8 +509,8 @@ public class DefBrokerRunManager implements BrokerRunManager, ConfigObserver {
     }
 
     @Override
-    public Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet) {
-        return brokerPubSubInfo.getAcceptPubPartInfo(topicSet);
+    public Map<String, String> getPubBrokerAcceptPubPartInfo(Map<String, Integer> topicSizeMap) {
+        return brokerPubSubInfo.getAcceptPubPartInfo(topicSizeMap);
     }
 
     @Override
@@ -529,8 +529,15 @@ public class DefBrokerRunManager implements BrokerRunManager, ConfigObserver {
     }
 
     @Override
-    public TopicInfo getPubBrokerTopicInfo(int brokerId, String topic) {
-        return brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic);
+    public void getSubBrokerTopicInfo(int brokerId, String topic,
+                                      Tuple2<Boolean, TopicInfo> result) {
+        brokerPubSubInfo.getBrokerSubPushedTopicInfo(brokerId, topic, result);
+    }
+
+    @Override
+    public void getPubBrokerTopicInfo(int brokerId, String topic,
+                                      Tuple3<Boolean, Boolean, TopicInfo> result) {
+        brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic, result);
     }
 
     @Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 9543ae632..0dc4dbb97 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -27,7 +27,7 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
-import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
@@ -68,6 +68,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
         // register modify method
         registerModifyWebMethod("admin_transfer_current_master",
                 "transferCurrentMaster");
+        // register modify method
         registerModifyWebMethod("admin_set_cluster_default_setting",
                 "adminSetClusterDefSetting");
         registerModifyWebMethod("admin_update_cluster_default_setting",
@@ -274,9 +275,11 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
         int totalRunTopicStoreCount = 0;
         boolean isSrvAcceptPublish = false;
         boolean isSrvAcceptSubscribe = false;
-        boolean isAcceptPublish = false;
-        boolean isAcceptSubscribe = false;
         boolean enableAuthControl = false;
+        TopicPropGroup topicProps;
+        TopicCtrlEntity authEntity;
+        BrokerConfEntity brokerConfEntity;
+        Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>();
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (Map.Entry<String, List<TopicDeployEntity>> entry : topicConfMap.entrySet()) {
             if (totalCount++ > 0) {
@@ -290,40 +293,32 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
             isSrvAcceptPublish = false;
             isSrvAcceptSubscribe = false;
             enableAuthControl = false;
-            isAcceptPublish = false;
-            isAcceptSubscribe = false;
             for (TopicDeployEntity entity : entry.getValue()) {
-                BrokerConfEntity brokerConfEntity =
+                brokerConfEntity =
                         defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
                 if (brokerConfEntity == null) {
                     continue;
                 }
                 brokerCount++;
-                Tuple2<Boolean, Boolean> pubSubStatus =
-                        WebParameterUtils.getPubSubStatusByManageStatus(
-                                brokerConfEntity.getManageStatus().getCode());
-                isAcceptPublish = pubSubStatus.getF0();
-                isAcceptSubscribe = pubSubStatus.getF1();
-                TopicPropGroup topicProps = entity.getTopicProps();
+                topicProps = entity.getTopicProps();
                 totalCfgTopicStoreCount += topicProps.getNumTopicStores();
                 totalCfgNumPartCount +=
                         topicProps.getNumPartitions() * topicProps.getNumTopicStores();
-                TopicInfo topicInfo =
-                        brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
-                if (topicInfo != null) {
-                    if (isAcceptPublish && topicInfo.isAcceptPublish()) {
+                brokerRunManager.getPubBrokerTopicInfo(
+                        entity.getBrokerId(), entity.getTopicName(), topicInfoTuple);
+                if (topicInfoTuple.getF2() != null) {
+                    if (topicInfoTuple.getF0() && topicInfoTuple.getF2().isAcceptPublish()) {
                         isSrvAcceptPublish = true;
                     }
-                    if (isAcceptSubscribe && topicInfo.isAcceptSubscribe()) {
+                    if (topicInfoTuple.getF1() && topicInfoTuple.getF2().isAcceptSubscribe()) {
                         isSrvAcceptSubscribe = true;
                     }
-                    totalRunTopicStoreCount += topicInfo.getTopicStoreNum();
+                    totalRunTopicStoreCount += topicInfoTuple.getF2().getTopicStoreNum();
                     totalRunNumPartCount +=
-                            topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+                            topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum();
                 }
             }
-            TopicCtrlEntity authEntity =
-                    defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
+            authEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
             if (authEntity != null) {
                 enableAuthControl = authEntity.isAuthCtrlEnable();
             }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index 344cb0f71..8d991bbb2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -28,8 +28,8 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
-import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
 import org.apache.inlong.tubemq.server.common.statusdef.TopicStsChgType;
 import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
@@ -607,16 +607,19 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                                               Map<String, List<TopicDeployEntity>> topicDeployInfoMap) {
         // build query result
         int totalCnt = 0;
+        int itemCount = 0;
+        int condStatusId = 1;
         int maxMsgSizeInMB = 0;
         int totalCfgNumPartCount = 0;
         int totalRunNumPartCount = 0;
         boolean enableAuthCtrl;
         boolean isSrvAcceptPublish = false;
         boolean isSrvAcceptSubscribe = false;
-        boolean isAcceptPublish = false;
-        boolean isAcceptSubscribe = false;
-        ManageStatus manageStatus;
-        Tuple2<Boolean, Boolean> pubSubStatus;
+        String strManageStatus;
+        TopicCtrlEntity ctrlEntity;
+        BrokerConfEntity brokerConfEntity;
+        List<GroupConsumeCtrlEntity> groupCtrlInfoLst;
+        Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>();
         BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         ClusterSettingEntity defSetting = defMetaDataService.getClusterDefSetting(false);
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
@@ -625,11 +628,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
             totalRunNumPartCount = 0;
             isSrvAcceptPublish = false;
             isSrvAcceptSubscribe = false;
-            isAcceptPublish = false;
-            isAcceptSubscribe = false;
-            enableAuthCtrl = false;
-            TopicCtrlEntity ctrlEntity =
-                    defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
+            ctrlEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
             if (ctrlEntity == null) {
                 continue;
             }
@@ -644,54 +643,55 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
             sBuffer.append("{\"topicName\":\"").append(entry.getKey())
                     .append("\",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
                     .append(",\"topicInfo\":[");
-            int brokerCount = 0;
+            itemCount = 0;
             for (TopicDeployEntity entity : entry.getValue()) {
-                if (brokerCount++ > 0) {
+                if (itemCount++ > 0) {
                     sBuffer.append(",");
                 }
                 totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
                 entity.toWebJsonStr(sBuffer, true, false);
                 sBuffer.append(",\"runInfo\":{");
-                BrokerConfEntity brokerConfEntity =
+                strManageStatus = "-";
+                brokerConfEntity =
                         defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
-                String strManageStatus = "-";
                 if (brokerConfEntity != null) {
-                    manageStatus = brokerConfEntity.getManageStatus();
-                    strManageStatus = manageStatus.getDescription();
-                    pubSubStatus = manageStatus.getPubSubStatus();
-                    isAcceptPublish = pubSubStatus.getF0();
-                    isAcceptSubscribe = pubSubStatus.getF1();
+                    strManageStatus = brokerConfEntity.getManageStatus().getDescription();
                 }
-                TopicInfo topicInfo =
-                        brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
-                if (topicInfo == null) {
+                brokerRunManager.getPubBrokerTopicInfo(
+                        entity.getBrokerId(), entity.getTopicName(), topicInfoTuple);
+                if (topicInfoTuple.getF2() == null) {
                     sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
                             .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
                 } else {
-                    if (isAcceptPublish) {
-                        sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
-                        if (topicInfo.isAcceptPublish()) {
+                    if (topicInfoTuple.getF0()) {
+                        sBuffer.append("\"acceptPublish\":")
+                                .append(topicInfoTuple.getF2().isAcceptPublish());
+                        if (topicInfoTuple.getF2().isAcceptPublish()) {
                             isSrvAcceptPublish = true;
                         }
                     } else {
                         sBuffer.append("\"acceptPublish\":false");
                     }
-                    if (isAcceptSubscribe) {
-                        sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
-                        if (topicInfo.isAcceptSubscribe()) {
+                    if (topicInfoTuple.getF1()) {
+                        sBuffer.append(",\"acceptSubscribe\":")
+                                .append(topicInfoTuple.getF2().isAcceptSubscribe());
+                        if (topicInfoTuple.getF2().isAcceptSubscribe()) {
                             isSrvAcceptSubscribe = true;
                         }
                     } else {
                         sBuffer.append(",\"acceptSubscribe\":false");
                     }
-                    totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
-                    sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
-                            .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+                    totalRunNumPartCount +=
+                            topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum();
+                    sBuffer.append(",\"numPartitions\":")
+                            .append(topicInfoTuple.getF2().getPartitionNum())
+                            .append(",\"numTopicStores\":")
+                            .append(topicInfoTuple.getF2().getTopicStoreNum())
                             .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
                 }
                 sBuffer.append("}}");
             }
-            sBuffer.append("],\"infoCount\":").append(brokerCount)
+            sBuffer.append("],\"infoCount\":").append(itemCount)
                     .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
                     .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
                     .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
@@ -702,9 +702,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                         .append(",\"createUser\":\"").append(ctrlEntity.getModifyUser())
                         .append("\",\"createDate\":\"").append(ctrlEntity.getModifyDateStr())
                         .append("\",\"authConsumeGroup\":[");
-                List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
-                        defMetaDataService.getConsumeCtrlByTopic(entry.getKey());
-                int itemCount = 0;
+                itemCount = 0;
+                groupCtrlInfoLst = defMetaDataService.getConsumeCtrlByTopic(entry.getKey());
                 for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
                     if (itemCount++ > 0) {
                         sBuffer.append(",");
@@ -718,7 +717,6 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                 }
                 sBuffer.append("],\"groupCount\":").append(itemCount).append(",\"authFilterCondSet\":[");
                 itemCount = 0;
-                int condStatusId = 1;
                 for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
                     if (itemCount++ > 0) {
                         sBuffer.append(",");
@@ -753,14 +751,16 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                                               Map<String, List<TopicDeployEntity>> topicDeployMap) {
         // build query result
         int totalCnt = 0;
+        int itemCount = 0;
         int totalCfgNumPartCount = 0;
         int totalRunNumPartCount = 0;
         boolean isSrvAcceptPublish = false;
         boolean isSrvAcceptSubscribe = false;
-        boolean isAcceptPublish = false;
-        boolean isAcceptSubscribe = false;
-        ManageStatus manageStatus;
-        Tuple2<Boolean, Boolean> pubSubStatus;
+        String strManageStatus;
+        TopicCtrlEntity ctrlEntity;
+        BrokerConfEntity brokerConfEntity;
+        List<GroupConsumeCtrlEntity> groupCtrlInfoLst;
+        Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>();
         BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployMap.entrySet()) {
@@ -768,9 +768,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
             totalRunNumPartCount = 0;
             isSrvAcceptPublish = false;
             isSrvAcceptSubscribe = false;
-            isAcceptPublish = false;
-            isAcceptSubscribe = false;
-            TopicCtrlEntity ctrlEntity =
+            ctrlEntity =
                     defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
             if (ctrlEntity == null) {
                 continue;
@@ -780,71 +778,71 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
             }
             ctrlEntity.toWebJsonStr(sBuffer, true, false);
             sBuffer.append(",\"deployInfo\":[");
-            int brokerCount = 0;
+            itemCount = 0;
             for (TopicDeployEntity entity : entry.getValue()) {
-                if (brokerCount++ > 0) {
+                if (itemCount++ > 0) {
                     sBuffer.append(",");
                 }
                 totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
                 entity.toWebJsonStr(sBuffer, true, false);
                 sBuffer.append(",\"runInfo\":{");
-                BrokerConfEntity brokerConfEntity =
+                brokerConfEntity =
                         defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
-
-                String strManageStatus = "-";
+                strManageStatus = "-";
                 if (brokerConfEntity != null) {
-                    manageStatus = brokerConfEntity.getManageStatus();
-                    strManageStatus = manageStatus.getDescription();
-                    pubSubStatus = manageStatus.getPubSubStatus();
-                    isAcceptPublish = pubSubStatus.getF0();
-                    isAcceptSubscribe = pubSubStatus.getF1();
+                    strManageStatus = brokerConfEntity.getManageStatus().getDescription();
                 }
-                TopicInfo topicInfo =
-                        brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
-                if (topicInfo == null) {
+                brokerRunManager.getPubBrokerTopicInfo(
+                        entity.getBrokerId(), entity.getTopicName(), topicInfoTuple);
+                if (topicInfoTuple.getF2() == null) {
                     sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
                             .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
                 } else {
-                    if (isAcceptPublish) {
-                        sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
-                        if (topicInfo.isAcceptPublish()) {
+                    if (topicInfoTuple.getF0()) {
+                        sBuffer.append("\"acceptPublish\":")
+                                .append(topicInfoTuple.getF2().isAcceptPublish());
+                        if (topicInfoTuple.getF2().isAcceptPublish()) {
                             isSrvAcceptPublish = true;
                         }
                     } else {
                         sBuffer.append("\"acceptPublish\":false");
                     }
-                    if (isAcceptSubscribe) {
-                        sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
-                        if (topicInfo.isAcceptSubscribe()) {
+                    if (topicInfoTuple.getF1()) {
+                        sBuffer.append(",\"acceptSubscribe\":")
+                                .append(topicInfoTuple.getF2().isAcceptSubscribe());
+                        if (topicInfoTuple.getF2().isAcceptSubscribe()) {
                             isSrvAcceptSubscribe = true;
                         }
                     } else {
                         sBuffer.append(",\"acceptSubscribe\":false");
                     }
-                    totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
-                    sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
-                            .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+                    totalRunNumPartCount +=
+                            topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum();
+                    sBuffer.append(",\"numPartitions\":")
+                            .append(topicInfoTuple.getF2().getPartitionNum())
+                            .append(",\"numTopicStores\":")
+                            .append(topicInfoTuple.getF2().getTopicStoreNum())
                             .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
                 }
                 sBuffer.append("}}");
             }
-            sBuffer.append("],\"infoCount\":").append(brokerCount)
+            sBuffer.append("],\"infoCount\":").append(itemCount)
                     .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
                     .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
                     .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
                     .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount);
             if (withAuthInfo) {
                 sBuffer.append(",\"groupAuthInfo\":[");
-                List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
+                groupCtrlInfoLst =
                         defMetaDataService.getConsumeCtrlByTopic(entry.getKey());
-                int countJ = 0;
+                itemCount = 0;
                 for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
-                    if (countJ++ > 0) {
+                    if (itemCount++ > 0) {
                         sBuffer.append(",");
                     }
                     groupEntity.toWebJsonStr(sBuffer, true, true);
                 }
-                sBuffer.append("],\"groupAuthCount\":").append(countJ);
+                sBuffer.append("],\"groupAuthCount\":").append(itemCount);
             }
             sBuffer.append("}");
         }
@@ -1073,5 +1071,4 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
         }
         return buildRetInfo(retInfo, sBuffer);
     }
-
 }