You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/15 10:57:54 UTC

[incubator-inlong] branch master updated: [INLONG-1794]Add consumer message processing logic (#1795)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e7386b  [INLONG-1794]Add consumer message processing logic (#1795)
6e7386b is described below

commit 6e7386bdd88987affdf808a2f13aa4a91ac401d9
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 15 18:57:45 2021 +0800

    [INLONG-1794]Add consumer message processing logic (#1795)
---
 .../inlong/tubemq/corebase/TBaseConstants.java     |   3 +
 .../inlong/tubemq/corebase/utils/OpsSyncInfo.java  | 117 ++++
 .../inlong/tubemq/server/master/MasterConfig.java  |  13 +
 .../inlong/tubemq/server/master/TMaster.java       | 741 +++++++++++++++++++--
 .../nodemanage/nodeconsumer/ConsumeGroupInfo.java  |  58 ++
 .../nodemanage/nodeconsumer/TopicConfigInfo.java   |  65 --
 6 files changed, 894 insertions(+), 103 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
index a15126b..ca5d31f 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
@@ -80,4 +80,7 @@ public class TBaseConstants {
     public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT =
             META_MAX_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
 
+    public static final long CFG_DEF_META_FORCE_UPDATE_PERIOD = 3 * 60 * 1000;
+    public static final long CFG_MIN_META_FORCE_UPDATE_PERIOD = 1 * 60 * 1000;
+
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/OpsSyncInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/OpsSyncInfo.java
new file mode 100644
index 0000000..4eb9dc6
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/OpsSyncInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.utils;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+
+public class OpsSyncInfo {
+
+    private boolean updated = false;
+    private long groupFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+    private long defFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+    private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
+    private long csmFrmMaxOffsetCtrlId = TBaseConstants.META_VALUE_UNDEFINED;
+    private boolean requireAuth = false;
+    private String defFlowControlInfo = "";
+    private String groupFlowControlInfo = "";
+
+    public OpsSyncInfo() {
+
+    }
+
+    public void updOpsSyncInfo(ClientMaster.OpsTaskInfo opsTaskInfo) {
+        if (opsTaskInfo == null) {
+            return;
+        }
+        if (opsTaskInfo.hasDefFlowCheckId()) {
+            this.defFlowChkId = opsTaskInfo.getDefFlowCheckId();
+            this.updated = true;
+        }
+        if (opsTaskInfo.hasGroupFlowCheckId()) {
+            this.groupFlowChkId = opsTaskInfo.getGroupFlowCheckId();
+            this.updated = true;
+        }
+        if (opsTaskInfo.hasQryPriorityId()) {
+            this.qryPriorityId = opsTaskInfo.getQryPriorityId();
+            this.updated = true;
+        }
+        if (opsTaskInfo.hasCsmFrmMaxOffsetCtrlId()
+                && opsTaskInfo.getCsmFrmMaxOffsetCtrlId() >= 0) {
+            this.csmFrmMaxOffsetCtrlId = opsTaskInfo.getCsmFrmMaxOffsetCtrlId();
+            this.updated = true;
+        }
+        if (opsTaskInfo.hasRequireAuth() && opsTaskInfo.getRequireAuth()) {
+            this.requireAuth = true;
+            this.updated = true;
+        }
+        if (opsTaskInfo.hasDefFlowControlInfo()
+                && TStringUtils.isNotBlank(opsTaskInfo.getDefFlowControlInfo())) {
+            this.defFlowControlInfo = opsTaskInfo.getDefFlowControlInfo();
+            this.updated = true;
+        }
+        if (opsTaskInfo.hasGroupFlowControlInfo()
+                && TStringUtils.isNotBlank(opsTaskInfo.getGroupFlowControlInfo())) {
+            this.groupFlowControlInfo = opsTaskInfo.getGroupFlowControlInfo();
+            this.updated = true;
+        }
+    }
+
+    public boolean isUpdated() {
+        return updated;
+    }
+
+    public long getGroupFlowChkId() {
+        return groupFlowChkId;
+    }
+
+    public long getDefFlowChkId() {
+        return defFlowChkId;
+    }
+
+    public int getQryPriorityId() {
+        return qryPriorityId;
+    }
+
+    public long getCsmFromMaxOffsetCtrlId() {
+        return csmFrmMaxOffsetCtrlId;
+    }
+
+    public boolean isRequireAuth() {
+        return requireAuth;
+    }
+
+    public String getDefFlowControlInfo() {
+        return defFlowControlInfo;
+    }
+
+    public String getGroupFlowControlInfo() {
+        return groupFlowControlInfo;
+    }
+
+    public void clear() {
+        this.updated = false;
+        this.groupFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+        this.defFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+        this.qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
+        this.csmFrmMaxOffsetCtrlId = TBaseConstants.META_VALUE_UNDEFINED;
+        this.requireAuth = false;
+        this.defFlowControlInfo = "";
+        this.groupFlowControlInfo = "";
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
index 1afabf7..0daa096 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
@@ -80,6 +80,7 @@ public class MasterConfig extends AbstractFileConfig {
     private String visitPassword = "";
     private long authValidTimeStampPeriodMs = TBaseConstants.CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL;
     private int rebalanceParallel = 4;
+    private long maxMetaForceUpdatePeriodMs = TBaseConstants.CFG_DEF_META_FORCE_UPDATE_PERIOD;
 
     /**
      * getters
@@ -258,6 +259,10 @@ public class MasterConfig extends AbstractFileConfig {
         return rebalanceParallel;
     }
 
+    public long getMaxMetaForceUpdatePeriodMs() {
+        return maxMetaForceUpdatePeriodMs;
+    }
+
     /**
      * Load file section attributes
      *
@@ -489,6 +494,13 @@ public class MasterConfig extends AbstractFileConfig {
             int tmpParallel = this.getInt(masterConf, "rebalanceParallel");
             this.rebalanceParallel = MixedUtils.mid(tmpParallel, 1, 20);
         }
+        if (TStringUtils.isNotBlank(masterConf.get("maxMetaForceUpdatePeriodMs"))) {
+            long tmpPeriodMs = this.getLong(masterConf, "maxMetaForceUpdatePeriodMs");
+            if (tmpPeriodMs < TBaseConstants.CFG_MIN_META_FORCE_UPDATE_PERIOD) {
+                tmpPeriodMs = TBaseConstants.CFG_MIN_META_FORCE_UPDATE_PERIOD;
+            }
+            this.maxMetaForceUpdatePeriodMs = tmpPeriodMs;
+        }
     }
 
     /**
@@ -636,6 +648,7 @@ public class MasterConfig extends AbstractFileConfig {
                 .append("visitName", visitName)
                 .append("visitPassword", visitPassword)
                 .append("rebalanceParallel", rebalanceParallel)
+                .append("maxMetaForceUpdatePeriodMs", maxMetaForceUpdatePeriodMs)
                 .append(",").append(replicationConfig.toString())
                 .append(",").append(tlsConfig.toString())
                 .append(",").append(zkConfig.toString())
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 0bee4de..f323287 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -45,6 +45,7 @@ import org.apache.inlong.tubemq.corebase.cluster.NodeAddrInfo;
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
 import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo;
+import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
 import org.apache.inlong.tubemq.corebase.config.TLSConfig;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestB2M;
@@ -77,6 +78,7 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.Registe
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2P;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
+import org.apache.inlong.tubemq.corebase.utils.OpsSyncInfo;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
@@ -97,6 +99,7 @@ import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.inlong.tubemq.server.common.offsetstorage.ZkOffsetStorage;
 import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils;
 import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
+import org.apache.inlong.tubemq.server.common.utils.ClientSyncInfo;
 import org.apache.inlong.tubemq.server.common.utils.HasThread;
 import org.apache.inlong.tubemq.server.common.utils.RowLock;
 import org.apache.inlong.tubemq.server.common.utils.Sleeper;
@@ -106,6 +109,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
@@ -135,7 +139,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private final BrokerRunManager brokerRunManager;       // broker run status manager
     private final ConsumerEventManager consumerEventManager;    //consumer event manager
     private final TopicPSInfoManager topicPSInfoManager;        //topic publish/subscribe info manager
-    private final ExecutorService executor;
+    private final ExecutorService svrExecutor;
+    private final ExecutorService cltExecutor;
     private final ProducerInfoHolder producerHolder;            //producer holder
     private final ConsumerInfoHolder consumerHolder;            //consumer holder
     private final RowLock masterRowLock;                        //lock
@@ -154,7 +159,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private boolean initialized = false;
     private boolean startupBalance = true;
     private int balanceDelayTimes = 0;
-    private AtomicInteger curBalanceParal = new AtomicInteger(0);
+    private AtomicInteger curSvrBalanceParal = new AtomicInteger(0);
+    private AtomicInteger curCltBalanceParal = new AtomicInteger(0);
     private Sleeper stopSleeper = new Sleeper(1000, this);
     private SimpleVisitTokenManager visitTokenManager;
 
@@ -171,7 +177,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.checkAndCreateBdbDataPath();
         this.masterAddInfo =
                 new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
-        this.executor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
+        this.svrExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
+        this.cltExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
         this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
         this.serverAuthHandler = new SimpleCertificateMasterHandler(this.masterConfig);
         this.heartbeatManager = new HeartbeatManager();
@@ -488,7 +495,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         checkNodeStatus(producerId, strBuffer);
         new ReleaseProducer().run(producerId);
         heartbeatManager.unRegProducerNode(producerId);
-        logger.info(strBuffer.append("Producer Closed")
+        logger.info(strBuffer.append("[Producer Closed] ")
                 .append(producerId).append(", isOverTLS=").append(overtls).toString());
         builder.setSuccess(true);
         builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -740,7 +747,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         // authorize check
         CertifiedResult authorizeResult =
                 serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
-                        groupName, consumeGroupInfo.getTopicSet(), consumeGroupInfo.getTopicConditions(), rmtAddress);
+                        groupName, consumeGroupInfo.getTopicSet(),
+                        consumeGroupInfo.getTopicConditions(), rmtAddress);
         if (!authorizeResult.result) {
             builder.setErrCode(authorizeResult.errCode);
             builder.setErrMsg(authorizeResult.errInfo);
@@ -1179,25 +1187,397 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return builder.build();
     }
 
+    /**
+     * Client balance consumer register request with master
+     *
+     * @param request
+     * @param rmtAddress
+     * @param overtls
+     * @return register response
+     * @throws Exception
+     */
     @Override
     public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request,
                                                        String rmtAddress,
                                                        boolean overtls) throws Throwable {
-        return null;
+        ProcessResult result = new ProcessResult();
+        final StringBuilder sBuffer = new StringBuilder(512);
+        RegisterResponseM2CV2.Builder builder = RegisterResponseM2CV2.newBuilder();
+        CertifiedResult certResult =
+                serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
+        if (!certResult.result) {
+            builder.setErrCode(certResult.errCode);
+            builder.setErrMsg(certResult.errInfo);
+            return builder.build();
+        }
+        ParamCheckResult paramCheckResult =
+                PBParameterUtils.checkClientId(request.getClientId(), sBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        final String consumerId = (String) paramCheckResult.checkData;
+        paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), sBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        //final String hostName = (String) paramCheckResult.checkData;
+        paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), sBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        final String groupName = (String) paramCheckResult.checkData;
+        paramCheckResult =
+                PBParameterUtils.checkConsumerTopicList(
+                        request.getTopicListList(), sBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        final Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
+        final Map<String, TreeSet<String>> reqTopicConditions =
+                DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
+        int sourceCount = request.getSourceCount();
+        int nodeId = request.getNodeId();
+        if (sourceCount > 0) {
+            if (nodeId < 0 || nodeId > (sourceCount - 1)) {
+                builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
+                builder.setErrMsg("Request nodeId value must be between in [0, sourceCount-1]!");
+                return builder.build();
+            }
+        }
+        final String clientJdkVer = request.hasJdkVersion()
+                ? request.getJdkVersion() : "";
+        final ConsumeType csmType = ConsumeType.CONSUME_CLIENT_REB;
+        OpsSyncInfo opsTaskInfo = new OpsSyncInfo();
+        if (request.hasOpsTaskInfo()) {
+            opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
+        }
+        // check master current status
+        checkNodeStatus(consumerId, sBuffer);
+        ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
+        if (request.hasSubRepInfo()) {
+            clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
+        }
+        // build consumer object
+        ConsumerInfo inConsumerInfo =
+                new ConsumerInfo(consumerId, overtls, groupName, csmType,
+                        sourceCount, nodeId, reqTopicSet, reqTopicConditions,
+                        opsTaskInfo.getCsmFromMaxOffsetCtrlId(), clientSyncInfo);
+        // need removed for authorize center begin
+        if (!this.defMetaDataManager
+                .isConsumeTargetAuthorized(consumerId, groupName,
+                        reqTopicSet, reqTopicConditions, sBuffer, result)) {
+            if (sBuffer.length() > 0) {
+                logger.warn(sBuffer.toString());
+            }
+            builder.setErrCode(result.getErrCode());
+            builder.setErrMsg(result.getErrMsg());
+            return builder.build();
+        }
+        // need removed for authorize center end
+        // check resource require
+        paramCheckResult =
+                PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
+                        masterConfig, defMetaDataManager, brokerRunManager, sBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        CertifiedResult authorizeResult =
+                serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
+                        groupName, reqTopicSet, reqTopicConditions, rmtAddress);
+        if (!authorizeResult.result) {
+            builder.setErrCode(authorizeResult.errCode);
+            builder.setErrMsg(authorizeResult.errInfo);
+            return builder.build();
+        }
+        Integer lid = null;
+        try {
+            lid = masterRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(consumerId), true);
+            if (!consumerHolder.addConsumer(inConsumerInfo, false, sBuffer, paramCheckResult)) {
+                builder.setErrCode(paramCheckResult.errCode);
+                builder.setErrMsg(paramCheckResult.errMsg);
+                return builder.build();
+            }
+            topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
+            heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
+        } catch (IOException e) {
+            logger.warn("Failed to lock.", e);
+        } finally {
+            if (lid != null) {
+                this.masterRowLock.releaseRowLock(lid);
+            }
+        }
+        ConsumeGroupInfo consumeGroupInfo =
+                consumerHolder.getConsumeGroupInfo(groupName);
+        if (consumeGroupInfo == null) {
+            logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId)
+                    .append(" visit consume group(").append(groupName)
+                    .append(" info failure, null information").toString());
+            builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
+            builder.setErrMsg(sBuffer.toString());
+            sBuffer.delete(0, sBuffer.length());
+            return builder.build();
+        }
+        inConsumerInfo = consumeGroupInfo.getConsumerInfo(consumerId);
+        if (inConsumerInfo == null) {
+            logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId)
+                    .append(" visit consume info failure, null information").toString());
+            builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
+            builder.setErrMsg(sBuffer.toString());
+            sBuffer.delete(0, sBuffer.length());
+            return builder.build();
+        }
+        Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
+        currentSubInfo.put(consumerId, topicPartSubMap);
+        Tuple2<Boolean, Set<Partition>> reportInfo = clientSyncInfo.getRepSubInfo();
+        if (reportInfo.getF0()) {
+            for (Partition info : reportInfo.getF1()) {
+                Map<String, Partition> partMap =
+                        topicPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
+                partMap.put(info.getPartitionKey(), info);
+            }
+            printReportInfo(consumerId, null, topicPartSubMap, sBuffer);
+        }
+        logger.info(sBuffer.append("[Consumer Register] ")
+                .append(consumerId).append(", isOverTLS=").append(overtls)
+                .append(", clientJDKVer=").append(clientJdkVer).toString());
+        sBuffer.delete(0, sBuffer.length());
+        Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+                brokerRunManager.getBrokerStaticInfo(overtls);
+        builder.setBrokerConfigId(brokerStaticInfo.getF0());
+        if (clientSyncInfo.getBrokerConfigId()
+                != brokerStaticInfo.getF0()) {
+            builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
+        }
+        builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo));
+        builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+        builder.setErrCode(TErrCodeConstants.SUCCESS);
+        builder.setErrMsg("OK!");
+        return builder.build();
     }
 
+    /**
+     * Client balance consumer heartbeat request with master
+     *
+     * @param request
+     * @param rmtAddress
+     * @param overtls
+     * @return heartbeat response
+     * @throws Throwable
+     */
     @Override
     public HeartResponseM2CV2 consumerHeartbeatC2MV2(HeartRequestC2MV2 request,
                                                      String rmtAddress,
                                                      boolean overtls) throws Throwable {
-        return null;
+        final StringBuilder strBuffer = new StringBuilder(512);
+        // response
+        HeartResponseM2CV2.Builder builder = HeartResponseM2CV2.newBuilder();
+        // identity valid
+        CertifiedResult certResult =
+                serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
+        if (!certResult.result) {
+            builder.setErrCode(certResult.errCode);
+            builder.setErrMsg(certResult.errInfo);
+            return builder.build();
+        }
+        ParamCheckResult paramCheckResult =
+                PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        final String clientId = (String) paramCheckResult.checkData;
+        paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        final String groupName = (String) paramCheckResult.checkData;
+        OpsSyncInfo opsTaskInfo = new OpsSyncInfo();
+        if (request.hasOpsTaskInfo()) {
+            opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
+        }
+        // check master current status
+        checkNodeStatus(clientId, strBuffer);
+        ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
+        if (request.hasSubRepInfo()) {
+            clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
+        }
+        ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
+        if (consumeGroupInfo == null) {
+            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+            builder.setErrMsg(strBuffer.append("Not found groupName ")
+                    .append(groupName).append(" in holder!").toString());
+            return builder.build();
+        }
+        ConsumerInfo inConsumerInfo =
+                consumeGroupInfo.getConsumerInfo(clientId);
+        if (inConsumerInfo == null) {
+            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+            builder.setErrMsg(strBuffer.append("Not found client ").append(clientId)
+                    .append(" in group(").append(groupName).append(")").toString());
+            strBuffer.delete(0, strBuffer.length());
+            return builder.build();
+        }
+        // authorize check
+        CertifiedResult authorizeResult =
+                serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
+                        groupName, consumeGroupInfo.getTopicSet(),
+                        consumeGroupInfo.getTopicConditions(), rmtAddress);
+        if (!authorizeResult.result) {
+            builder.setErrCode(authorizeResult.errCode);
+            builder.setErrMsg(authorizeResult.errInfo);
+            return builder.build();
+        }
+        // heartbeat check
+        try {
+            heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
+        } catch (HeartbeatException e) {
+            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+            builder.setErrMsg(strBuffer
+                    .append("Update consumer node exception:")
+                    .append(e.getMessage()).toString());
+            return builder.build();
+        }
+        inConsumerInfo.updClientReportInfo(opsTaskInfo.getCsmFromMaxOffsetCtrlId(),
+                clientSyncInfo.getLstAssignedTime(), clientSyncInfo.getTopicMetaInfoId());
+        Tuple2<Boolean, Set<Partition>> reportInfo = clientSyncInfo.getRepSubInfo();
+        if (reportInfo.getF0()) {
+            Map<String, Map<String, Partition>> curPartSubMap =
+                    currentSubInfo.get(clientId);
+            Map<String, Map<String, Partition>> newPartSubMap = new HashMap<>();
+            for (Partition info : reportInfo.getF1()) {
+                Map<String, Partition> partMap =
+                        newPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
+                partMap.put(info.getPartitionKey(), info);
+            }
+            printReportInfo(clientId, curPartSubMap, newPartSubMap, strBuffer);
+            currentSubInfo.put(clientId, newPartSubMap);
+        }
+        Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+                brokerRunManager.getBrokerStaticInfo(overtls);
+        builder.setBrokerConfigId(brokerStaticInfo.getF0());
+        if (clientSyncInfo.getBrokerConfigId()
+                != brokerStaticInfo.getF0()) {
+            builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
+        }
+        Map<String, Map<String, Partition>> curPartMap = currentSubInfo.get(clientId);
+        if (inConsumerInfo.getLstAssignedTime() >= 0
+                && (curPartMap != null && !curPartMap.isEmpty())
+                && (System.currentTimeMillis() - inConsumerInfo.getLstAssignedTime()
+                > masterConfig.getMaxMetaForceUpdatePeriodMs())) {
+            Tuple2<Long, List<String>> topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+            builder.setTopicMetaInfoId(topicMetaInfoTuple.getF0());
+            if (topicMetaInfoTuple.getF0() != clientSyncInfo.getTopicMetaInfoId()) {
+                builder.addAllTopicMetaInfoList(topicMetaInfoTuple.getF1());
+            }
+        }
+        builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo));
+        builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+        builder.setErrCode(TErrCodeConstants.SUCCESS);
+        builder.setErrMsg("OK!");
+        return builder.build();
     }
 
+    /**
+     * Client balance consumer get partition meta information with master
+     *
+     * @param request
+     * @param rmtAddress
+     * @param overtls
+     * @return close response
+     * @throws Exception
+     */
     @Override
     public GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(GetPartMetaRequestC2M request,
                                                              String rmtAddress,
                                                              boolean overtls) throws Throwable {
-        return null;
+        StringBuilder strBuffer = new StringBuilder(512);
+        GetPartMetaResponseM2C.Builder builder = GetPartMetaResponseM2C.newBuilder();
+        CertifiedResult certResult =
+                serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
+        if (!certResult.result) {
+            builder.setErrCode(certResult.errCode);
+            builder.setErrMsg(certResult.errInfo);
+            return builder.build();
+        }
+        ParamCheckResult paramCheckResult =
+                PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        final String clientId = (String) paramCheckResult.checkData;
+        paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
+        if (!paramCheckResult.result) {
+            builder.setErrCode(paramCheckResult.errCode);
+            builder.setErrMsg(paramCheckResult.errMsg);
+            return builder.build();
+        }
+        final String groupName = (String) paramCheckResult.checkData;
+        final long brokerConfigId = request.getBrokerConfigId();
+        final long topicMetaInfoId = request.getTopicMetaInfoId();
+        checkNodeStatus(clientId, strBuffer);
+        // get control object
+        ConsumeGroupInfo consumeGroupInfo =
+                consumerHolder.getConsumeGroupInfo(groupName);
+        if (consumeGroupInfo == null) {
+            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+            builder.setErrMsg(strBuffer.append("Not found groupName ")
+                    .append(groupName).append(" in holder!").toString());
+            strBuffer.delete(0, strBuffer.length());
+            return builder.build();
+        }
+        ConsumerInfo inConsumerInfo =
+                consumeGroupInfo.getConsumerInfo(clientId);
+        if (inConsumerInfo == null) {
+            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+            builder.setErrMsg(strBuffer.append("Not found client ").append(clientId)
+                    .append(" in group(").append(groupName).append(")").toString());
+            strBuffer.delete(0, strBuffer.length());
+            return builder.build();
+        }
+        // heartbeat check
+        try {
+            heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
+        } catch (HeartbeatException e) {
+            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+            builder.setErrMsg(strBuffer
+                    .append("Update consumer node exception:")
+                    .append(e.getMessage()).toString());
+            return builder.build();
+        }
+        Tuple2<Long, List<String>> topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+        if (topicMetaInfoTuple.getF0() == TBaseConstants.META_VALUE_UNDEFINED) {
+            freshTopicMetaInfo(consumeGroupInfo, strBuffer);
+            topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+        }
+        builder.setTopicMetaInfoId(topicMetaInfoTuple.getF0());
+        if (topicMetaInfoTuple.getF0() != topicMetaInfoId) {
+            builder.addAllTopicMetaInfoList(topicMetaInfoTuple.getF1());
+        }
+        Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+                brokerRunManager.getBrokerStaticInfo(overtls);
+        builder.setBrokerConfigId(brokerStaticInfo.getF0());
+        if (brokerConfigId != brokerStaticInfo.getF0()) {
+            builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
+        }
+        builder.setErrCode(TErrCodeConstants.SUCCESS);
+        builder.setErrMsg("OK!");
+        return builder.build();
     }
 
     /**
@@ -1237,7 +1617,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     public void run() {
         try {
             if (!this.stopped) {
-                Thread.sleep(masterConfig.getFirstBalanceDelayAfterStartMs());
                 this.balancerChore = startBalancerChore(this);
                 initialized = true;
                 while (!this.stopped) {
@@ -1258,44 +1637,60 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      */
     private void balance(final TMaster tMaster) {
         final StringBuilder strBuffer = new StringBuilder(512);
-        final long rebalanceId = idGenerator.incrementAndGet();
+        final long balanceId = idGenerator.incrementAndGet();
         if (defMetaDataManager != null) {
-            logger.info(strBuffer.append("[Rebalance Start] ").append(rebalanceId)
+            logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
                     .append(", isMaster=").append(defMetaDataManager.isSelfMaster())
                     .append(", isPrimaryNodeActive=")
                     .append(defMetaDataManager.isPrimaryNodeActive()).toString());
         } else {
-            logger.info(strBuffer.append("[Rebalance Start] ").append(rebalanceId)
+            logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
                     .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
         }
         strBuffer.delete(0, strBuffer.length());
-        int curDoingTasks = this.curBalanceParal.get();
+        // process client-balance
+        processClientBalanceMetaInfo(balanceId, strBuffer);
+        // process server-balance
+        processServerBalance(tMaster, balanceId, strBuffer);
+        logger.info(strBuffer.append("[Balance End] ").append(balanceId).toString());
+    }
+
+    private void processServerBalance(TMaster tMaster,
+                                      long balanceId,
+                                      StringBuilder sBuffer) {
+        int curDoingTasks = this.curSvrBalanceParal.get();
         if (curDoingTasks > 0) {
-            logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId)
-                    .append(", the previous rebalance has ")
-                    .append(curDoingTasks).append(" task(s) in progress!").toString());
+            logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId)
+                    .append(" the Server-Balance has ").append(curDoingTasks)
+                    .append(" task(s) in progress!").toString());
+            sBuffer.delete(0, sBuffer.length());
             return;
         }
         final boolean isStartBalance = startupBalance;
         List<String> groupsNeedToBalance = isStartBalance
-                ? consumerHolder.getAllServerBalanceGroups() : getNeedToBalanceGroups(strBuffer);
-        strBuffer.delete(0, strBuffer.length());
-        if (!groupsNeedToBalance.isEmpty()) {
-            // set parallel rebalance signal
-            curBalanceParal.set(masterConfig.getRebalanceParallel());
+                ? consumerHolder.getAllServerBalanceGroups() : getNeedToBalanceGroups(sBuffer);
+        sBuffer.delete(0, sBuffer.length());
+        int balanceTaskCnt = groupsNeedToBalance.size();
+        if (balanceTaskCnt > 0) {
             // calculate process count
-            int unitNum = (groupsNeedToBalance.size() + masterConfig.getRebalanceParallel() - 1)
+            int unitNum = (balanceTaskCnt + masterConfig.getRebalanceParallel() - 1)
                     / masterConfig.getRebalanceParallel();
-            // start processer to do reblance;
+            // start processor to do balance;
             int startIndex = 0;
             int endIndex = 0;
+            // set parallel balance signal
+            curSvrBalanceParal.set(masterConfig.getRebalanceParallel());
             for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
-                // get groups need to rebalance
-                startIndex = Math.min((i) * unitNum, groupsNeedToBalance.size());
-                endIndex = Math.min((i + 1) * unitNum, groupsNeedToBalance.size());
+                // get groups need to balance
+                startIndex = Math.min((i) * unitNum, balanceTaskCnt);
+                endIndex = Math.min((i + 1) * unitNum, balanceTaskCnt);
                 final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
-                // execute rebalance
-                this.executor.execute(new Runnable() {
+                if (subGroups.isEmpty()) {
+                    curSvrBalanceParal.decrementAndGet();
+                    continue;
+                }
+                // execute balance
+                this.svrExecutor.execute(new Runnable() {
                     @Override
                     public void run() {
                         try {
@@ -1304,11 +1699,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                             }
                             // first process reset rebalance task;
                             try {
-                                tMaster.processResetbalance(rebalanceId,
+                                tMaster.processResetbalance(balanceId,
                                         isStartBalance, subGroups);
                             } catch (Throwable e) {
                                 logger.warn(new StringBuilder(1024)
-                                        .append("[Rebalance processor] Error during reset-reb,")
+                                        .append("[Svr-Balance processor] Error during reset-reb,")
                                         .append("the groups that may be affected are ")
                                         .append(subGroups).append(",error is ")
                                         .append(e).toString());
@@ -1316,31 +1711,148 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                             if (tMaster.isStopped()) {
                                 return;
                             }
-                            // second process normal rebalance task;
+                            // second process normal balance task;
                             try {
-                                tMaster.processRebalance(rebalanceId,
+                                tMaster.processRebalance(balanceId,
                                         isStartBalance, subGroups);
                             } catch (Throwable e) {
                                 logger.warn(new StringBuilder(1024)
-                                        .append("[Rebalance processor] Error during normal-reb,")
+                                        .append("[Svr-Balance processor] Error during normal-reb,")
                                         .append("the groups that may be affected are ")
                                         .append(subGroups).append(",error is ")
                                         .append(e).toString());
                             }
                         } catch (Throwable e) {
-                            logger.warn("[Rebalance processor] Error during process", e);
+                            logger.warn("[Svr-Balance processor] Error during process", e);
                         } finally {
-                            curBalanceParal.decrementAndGet();
+                            curSvrBalanceParal.decrementAndGet();
                         }
                     }
                 });
             }
         }
         startupBalance = false;
-        logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId).toString());
+        logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId).toString());
+        sBuffer.delete(0, sBuffer.length());
+    }
+
+    private void processClientBalanceMetaInfo(long balanceId, StringBuilder sBuffer) {
+        int curDoingTasks = this.curCltBalanceParal.get();
+        if (curDoingTasks > 0) {
+            logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId)
+                    .append(" the Client-Balance has ").append(curDoingTasks)
+                    .append(" task(s) in progress!").toString());
+            sBuffer.delete(0, sBuffer.length());
+            return;
+        }
+        List<String> clientGroups = consumerHolder.getAllClientBalanceGroups();
+        if (!clientGroups.isEmpty()) {
+            int balanceTaskCnt = clientGroups.size();
+            // calculate process count
+            int unitNum = (balanceTaskCnt + masterConfig.getRebalanceParallel() - 1)
+                    / masterConfig.getRebalanceParallel();
+            // start processor to do balance;
+            int startIndex = 0;
+            int endIndex = 0;
+            // set parallel balance signal
+            curCltBalanceParal.set(masterConfig.getRebalanceParallel());
+            for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
+                // get groups need to rebalance
+                startIndex = Math.min((i) * unitNum, balanceTaskCnt);
+                endIndex = Math.min((i + 1) * unitNum, balanceTaskCnt);
+                final List<String> subGroups = clientGroups.subList(startIndex, endIndex);
+                if (subGroups.isEmpty()) {
+                    curCltBalanceParal.decrementAndGet();
+                    continue;
+                }
+                // execute balance
+                this.cltExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            if (subGroups.isEmpty()) {
+                                return;
+                            }
+                            ConsumeGroupInfo consumeGroupInfo;
+                            StringBuilder sBuffer2 = new StringBuilder(512);
+                            for (String groupName : subGroups) {
+                                consumeGroupInfo =
+                                        consumerHolder.getConsumeGroupInfo(groupName);
+                                if (consumeGroupInfo == null) {
+                                    continue;
+                                }
+                                freshTopicMetaInfo(consumeGroupInfo, sBuffer2);
+                            }
+                        } catch (Throwable e) {
+                            logger.warn("[Clt-Balance processor] Error during process", e);
+                        } finally {
+                            curCltBalanceParal.decrementAndGet();
+                        }
+                    }
+                });
+            }
+        }
+        logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId).toString());
+        sBuffer.delete(0, sBuffer.length());
     }
 
-    // process unReset group rebalance
+    public void freshTopicMetaInfo(ConsumeGroupInfo consumeGroupInfo, StringBuilder sBuffer) {
+        Map<String, String> topicMetaInfoMap =
+                getTopicConfigInfo(consumeGroupInfo, sBuffer);
+        consumeGroupInfo.updCsmTopicMetaInfo(topicMetaInfoMap);
+    }
+
+    private Map<String, String> getTopicConfigInfo(ConsumeGroupInfo groupInfo,
+                                                   StringBuilder sBuffer) {
+        Map<String, String> result = new HashMap<>();
+        if (groupInfo.getTopicSet() == null || groupInfo.getTopicSet().isEmpty()) {
+            return result;
+        }
+        Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
+                defMetaDataManager.getTopicConfMapByTopicAndBrokerIds(
+                        groupInfo.getTopicSet(), null);
+        if (topicDeployInfoMap == null || topicDeployInfoMap.isEmpty()) {
+            return result;
+        }
+        int count = 0;
+        int statusId = 0;
+        TopicInfo topicInfo;
+        Set<String> fbdTopicSet =
+                defMetaDataManager.getDisableTopicByGroupName(groupInfo.getGroupName());
+        for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
+            if (entry == null) {
+                continue;
+            }
+            count = 0;
+            statusId = 0;
+            for (TopicDeployEntity deployInfo : entry.getValue()) {
+                topicInfo =
+                        brokerRunManager.getPubBrokerTopicInfo(
+                                deployInfo.getBrokerId(), deployInfo.getTopicName());
+                if (topicInfo != null
+                        && topicInfo.isAcceptSubscribe()
+                        && !fbdTopicSet.contains(topicInfo.getTopic())) {
+                    statusId = 1;
+                }
+                if (count++ == 0) {
+                    sBuffer.append(entry.getKey()).append(TokenConstants.SEGMENT_SEP);
+                } else {
+                    sBuffer.append(TokenConstants.ARRAY_SEP);
+                }
+                sBuffer.append(deployInfo.getBrokerId()).append(TokenConstants.ATTR_SEP)
+                        .append(deployInfo.getNumTopicStores()).append(TokenConstants.ATTR_SEP)
+                        .append(deployInfo.getNumPartitions()).append(TokenConstants.ATTR_SEP)
+                        .append(statusId);
+            }
+            if (count > 0) {
+                result.put(entry.getKey(), sBuffer.toString());
+            }
+            sBuffer.delete(0, sBuffer.length());
+        }
+        return result;
+    }
+
+    // process unReset group balance
     public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
         // #lizard forgives
         Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
@@ -1733,6 +2245,158 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return outClientConfig;
     }
 
+    private ClientMaster.OpsTaskInfo.Builder buildOpsTaskInfo(ConsumeGroupInfo groupInfo,
+                                                              ConsumerInfo nodeInfo,
+                                                              OpsSyncInfo opsTaskInfo) {
+        ClientMaster.OpsTaskInfo.Builder builder = ClientMaster.OpsTaskInfo.newBuilder();
+        long csmFromMaxCtrlId = groupInfo.getCsmFromMaxCtrlId();
+        if (csmFromMaxCtrlId != TBaseConstants.META_VALUE_UNDEFINED
+                && nodeInfo.getCsmFromMaxOffsetCtrlId() != csmFromMaxCtrlId) {
+            builder.setCsmFrmMaxOffsetCtrlId(csmFromMaxCtrlId);
+        }
+        ClusterSettingEntity defSetting =
+                defMetaDataManager.getClusterDefSetting(false);
+        GroupResCtrlEntity groupResCtrlConf =
+                defMetaDataManager.confGetGroupResCtrlConf(nodeInfo.getGroupName());
+        if (defSetting.enableFlowCtrl()) {
+            builder.setDefFlowCheckId(defSetting.getSerialId());
+            if (opsTaskInfo.getDefFlowChkId() != defSetting.getSerialId()) {
+                builder.setDefFlowControlInfo(defSetting.getGloFlowCtrlRuleInfo());
+            }
+        }
+        if (groupResCtrlConf != null
+                && groupResCtrlConf.isFlowCtrlEnable()) {
+            builder.setGroupFlowCheckId(groupResCtrlConf.getSerialId());
+            builder.setQryPriorityId(groupResCtrlConf.getQryPriorityId());
+            if (opsTaskInfo.getGroupFlowChkId() != groupResCtrlConf.getSerialId()) {
+                builder.setGroupFlowControlInfo(groupResCtrlConf.getFlowCtrlInfo());
+            }
+        }
+        return builder;
+    }
+
+    private void printReportInfo(String consumerId,
+                                 Map<String, Map<String, Partition>> curPartSubMa,
+                                 Map<String, Map<String, Partition>> newPartSubMap,
+                                 StringBuilder sBuffer) {
+        boolean printHeader = true;
+        if (curPartSubMa == null || curPartSubMa.isEmpty()) {
+            if (newPartSubMap != null && !newPartSubMap.isEmpty()) {
+                for (Map<String, Partition> newPartMap : newPartSubMap.values()) {
+                    if (newPartMap == null || newPartMap.isEmpty()) {
+                        continue;
+                    }
+                    for (Partition info : newPartMap.values()) {
+                        printHeader = printReportItemInfo(consumerId,
+                                info, true, printHeader, sBuffer);
+                    }
+                }
+            }
+        } else {
+            if (newPartSubMap == null || newPartSubMap.isEmpty()) {
+                for (Map<String, Partition> curPartMap : curPartSubMa.values()) {
+                    if (curPartMap == null || curPartMap.isEmpty()) {
+                        continue;
+                    }
+                    for (Partition info : curPartMap.values()) {
+                        printHeader = printReportItemInfo(consumerId,
+                                info, false, printHeader, sBuffer);
+                    }
+                }
+            } else {
+                for (Map.Entry<String, Map<String, Partition>> curEntry
+                        : curPartSubMa.entrySet()) {
+                    if (curEntry == null || curEntry.getKey() == null) {
+                        continue;
+                    }
+                    Map<String, Partition> newPartMap =
+                            newPartSubMap.get(curEntry.getKey());
+                    if (newPartMap == null || newPartMap.isEmpty()) {
+                        Map<String, Partition> curPartMap = curEntry.getValue();
+                        if (curPartMap == null || curPartMap.isEmpty()) {
+                            continue;
+                        }
+                        for (Partition info : curPartMap.values()) {
+                            printHeader = printReportItemInfo(consumerId,
+                                    info, false, printHeader, sBuffer);
+                        }
+                    } else {
+                        Map<String, Partition> curPartMap = curEntry.getValue();
+                        if (curPartMap == null || curPartMap.isEmpty()) {
+                            for (Partition info : newPartMap.values()) {
+                                printHeader = printReportItemInfo(consumerId,
+                                        info, true, printHeader, sBuffer);
+                            }
+                        } else {
+                            for (Map.Entry<String, Partition> curPartEntry
+                                    : curPartMap.entrySet()) {
+                                if (curPartEntry == null
+                                        || curPartEntry.getKey() == null
+                                        || curPartEntry.getValue() == null) {
+                                    continue;
+                                }
+                                if (newPartMap.get(curPartEntry.getKey()) == null) {
+                                    printHeader = printReportItemInfo(consumerId,
+                                            curPartEntry.getValue(), false, printHeader, sBuffer);
+                                }
+                            }
+                            for (Map.Entry<String, Partition> newPartEntry
+                                    : newPartMap.entrySet()) {
+                                if (newPartEntry == null
+                                        || newPartEntry.getKey() == null
+                                        || newPartEntry.getValue() == null) {
+                                    continue;
+                                }
+                                if (curPartMap.get(newPartEntry.getKey()) == null) {
+                                    printHeader = printReportItemInfo(consumerId,
+                                            newPartEntry.getValue(), true, printHeader, sBuffer);
+                                }
+                            }
+                        }
+                    }
+                }
+                for (Map.Entry<String, Map<String, Partition>> newEntry
+                        : newPartSubMap.entrySet()) {
+                    if (newEntry == null || newEntry.getKey() == null) {
+                        continue;
+                    }
+                    if (curPartSubMa.get(newEntry.getKey()) != null) {
+                        continue;
+                    }
+                    Map<String, Partition> newPartMap = newEntry.getValue();
+                    if (newPartMap == null || newPartMap.isEmpty()) {
+                        continue;
+                    }
+                    for (Partition info : newPartMap.values()) {
+                        printHeader = printReportItemInfo(consumerId,
+                                info, true, printHeader, sBuffer);
+                    }
+                }
+            }
+        }
+        if (sBuffer.length() > 0) {
+            logger.info(sBuffer.append("]").toString());
+        }
+        sBuffer.delete(0, sBuffer.length());
+
+    }
+
+    private boolean printReportItemInfo(String consumerId, Partition info,
+                                        boolean isAdd, boolean printHeader,
+                                        StringBuilder sBuffer) {
+        if (info == null) {
+            return printHeader;
+        }
+        String type = isAdd ? "[added]" : "[deleted]";
+        if (printHeader) {
+            sBuffer.append("[SubInfo Report] client ")
+                    .append(consumerId).append(" report the info: [");
+            printHeader = false;
+        }
+        sBuffer.append(type).append(info.toString()).append(", ");
+        return printHeader;
+    }
+
     /**
      * build cluster configure info
      *
@@ -1795,7 +2459,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         try {
             webServer.stop();
             rpcServiceFactory.destroy();
-            executor.shutdown();
+            svrExecutor.shutdown();
+            cltExecutor.shutdown();
             stopChores();
             heartbeatManager.stop();
             zkOffsetStorage.close();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index 0238109..a13c8b2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
 import org.slf4j.Logger;
@@ -264,6 +265,44 @@ public class ConsumeGroupInfo {
         }
     }
 
+    public void updCsmFromMaxCtrlId() {
+        csmCtrlId.set(System.currentTimeMillis());
+    }
+
+    public void updCsmTopicMetaInfo(Map<String, String> result) {
+        lastMetaInfoFreshTime.set(System.currentTimeMillis());
+        if (result == null || result.isEmpty()) {
+            return;
+        }
+        String newConfig;
+        String curCOnfig;
+        boolean isChanged = false;
+        Set<String> newTopics = result.keySet();
+        Set<String> curTopics = topicMetaInfoMap.keySet();
+        if (newTopics.size() != curTopics.size()
+                || !newTopics.containsAll(curTopics)) {
+            isChanged = true;
+        } else {
+            for (String topicKey : newTopics) {
+                newConfig = result.get(topicKey);
+                curCOnfig = topicMetaInfoMap.get(topicKey);
+                if (newConfig == null) {
+                    continue;
+                }
+                if (!newConfig.equals(curCOnfig)) {
+                    isChanged = true;
+                    break;
+                }
+            }
+        }
+        if (isChanged) {
+            for (String newTopic : newTopics) {
+                topicMetaInfoMap.put(newTopic, result.get(newTopic));
+            }
+            topicMetaInfoId.set(System.currentTimeMillis());
+        }
+    }
+
     public boolean isUnReadyServerBalance() {
         return (consumeType == ConsumeType.CONSUME_BAND
                 && notAllocate.get()
@@ -411,6 +450,25 @@ public class ConsumeGroupInfo {
         return this.topicConditions;
     }
 
+    public long getCsmFromMaxCtrlId() {
+        return csmCtrlId.get();
+    }
+
+    public Tuple2<Long, List<String>> getTopicMetaInfo() {
+        List<String> topicMetaInfoList = new ArrayList<>();
+        for (String metaInfo : topicMetaInfoMap.values()) {
+            if (TStringUtils.isBlank(metaInfo)) {
+                continue;
+            }
+            topicMetaInfoList.add(metaInfo);
+        }
+        return new Tuple2<>(topicMetaInfoId.get(), topicMetaInfoList);
+    }
+
+    public AtomicLong getLastMetaInfoFreshTime() {
+        return lastMetaInfoFreshTime;
+    }
+
     public List<ConsumerInfo> getConsumerInfoList() {
         try {
             csmInfoRWLock.readLock().lock();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
deleted file mode 100644
index 766af32..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
-
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
-
-public class TopicConfigInfo implements Comparable<TopicConfigInfo> {
-    private final int brokerId;
-    private final String topicName;
-    private final int numStore;
-    private final int numPart;
-    private boolean acceptSub;
-
-    public TopicConfigInfo(TopicDeployEntity topicDeployEntity) {
-        this.brokerId = topicDeployEntity.getBrokerId();
-        this.topicName = topicDeployEntity.getTopicName();
-        this.numStore = topicDeployEntity.getNumTopicStores();
-        this.numPart = topicDeployEntity.getNumPartitions();
-        this.acceptSub = false;
-    }
-
-    public int getBrokerId() {
-        return brokerId;
-    }
-
-    public String getTopicName() {
-        return topicName;
-    }
-
-    public int getNumStore() {
-        return numStore;
-    }
-
-    public void setAcceptSub(boolean acceptSub) {
-        this.acceptSub = acceptSub;
-    }
-
-    public boolean isAcceptSub() {
-        return acceptSub;
-    }
-
-    public int getNumPart() {
-        return numPart;
-    }
-
-    @Override
-    public int compareTo(TopicConfigInfo o) {
-        return Integer.compare(this.brokerId, o.brokerId);
-    }
-}