You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/19 07:54:38 UTC

[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-602] Add replacement processing after metadata changes (#468)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
     new 4f68e8a  [INLONG-602] Add replacement processing after metadata changes (#468)
4f68e8a is described below

commit 4f68e8a978a1e849a2acbc23ec29d25fd5b012e1
Author: gosonzhang <go...@apache.org>
AuthorDate: Wed May 19 15:54:30 2021 +0800

    [INLONG-602] Add replacement processing after metadata changes (#468)
---
 .../server/common/heartbeat/HeartbeatManager.java  |  43 +-
 .../server/common/paramcheck/PBParameterUtils.java |  16 +-
 .../server/common/utils/WebParameterUtils.java     |  18 +-
 .../org/apache/tubemq/server/master/TMaster.java   | 826 +++-----------------
 .../server/master/balance/DefaultLoadBalancer.java |  62 +-
 .../tubemq/server/master/balance/LoadBalancer.java |  12 +-
 .../server/master/metamanage/MetaDataManager.java  | 486 ++----------
 .../impl/bdbimpl/BdbBrokerConfigMapperImpl.java    |   4 +-
 ...{BrokerInfoHolder.java => BrokerAbnHolder.java} |  79 +-
 .../nodemanage/nodebroker/BrokerPSInfoHolder.java  |   9 +-
 .../nodemanage/nodebroker/BrokerRunManager.java    |  71 +-
 .../nodemanage/nodebroker/BrokerRunStatusInfo.java |  62 +-
 .../nodemanage/nodebroker/BrokerSyncData.java      |  38 +-
 .../nodebroker/BrokerSyncStatusInfo.java           | 843 ---------------------
 .../nodemanage/nodebroker/BrokerTopicInfoView.java | 131 ++--
 .../nodemanage/nodebroker/DefBrokerRunManager.java | 504 ++++++++++++
 .../nodemanage/nodebroker/TopicPSInfoManager.java  | 150 ----
 .../server/master/web/action/screen/Master.java    |  15 +-
 .../web/action/screen/config/BrokerList.java       |  12 +-
 .../master/web/handler/WebBrokerConfHandler.java   |  56 +-
 .../master/web/handler/WebMasterInfoHandler.java   |  12 +-
 .../master/web/handler/WebTopicDeployHandler.java  |  30 +-
 .../tubemq/server/common/HeartbeatManagerTest.java |   2 +-
 .../nodebroker/BrokerInfoHolderTest.java           |  47 --
 24 files changed, 1039 insertions(+), 2489 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
index c000b31..dc83412 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
@@ -23,8 +23,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+
+import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.exception.HeartbeatException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,10 +166,12 @@ public class HeartbeatManager {
      * Register a node as broker.
      *
      * @param nodeId the id of a node to be registered.
+     * @param createId  broker run-info block id
      * @return the timeout info for the registered node
      */
-    public TimeoutInfo regBrokerNode(final String nodeId) {
-        return this.brokerRegMap.put(nodeId, new TimeoutInfo(this.brokerTimeoutDlt));
+    public TimeoutInfo regBrokerNode(String nodeId, String createId) {
+        return this.brokerRegMap.put(nodeId,
+                new TimeoutInfo(createId, this.brokerTimeoutDlt));
     }
 
     /**
@@ -208,10 +213,18 @@ public class HeartbeatManager {
      * Unregister a node from the broker
      *
      * @param nodeId the id of node to be unregistered
-     * @return the timeout of the node
+     * @return if the timeout delete, true: success, false: failure
      */
-    public TimeoutInfo unRegBrokerNode(final String nodeId) {
-        return brokerRegMap.remove(nodeId);
+    public boolean unRegBrokerNode(String nodeId, String createId) {
+        TimeoutInfo timeoutInfo = brokerRegMap.get(nodeId);
+        if (timeoutInfo == null) {
+            return true;
+        }
+        if (!createId.equals(timeoutInfo.getSecondKey())) {
+            return false;
+        }
+        timeoutInfo = brokerRegMap.remove(nodeId);
+        return true;
     }
 
     /**
@@ -240,14 +253,26 @@ public class HeartbeatManager {
      * @param nodeId the id of node to be updated
      * @throws HeartbeatException if the timeout info of the node is not found
      */
-    public void updBrokerNode(final String nodeId) throws HeartbeatException {
+    public boolean updBrokerNode(String nodeId, String createId,
+                                 StringBuilder sBuffer, ProcessResult result) {
         TimeoutInfo timeoutInfo = brokerRegMap.get(nodeId);
         if (timeoutInfo == null) {
-            throw new HeartbeatException(new StringBuilder(512)
-                    .append("Invalid node id:").append(nodeId)
-                    .append(", you have to append node first!").toString());
+            result.setFailResult(TErrCodeConstants.HB_NO_NODE,
+                    sBuffer.append("Invalid node id:").append(nodeId)
+                            .append(", you have to append node first!").toString());
+            sBuffer.delete(0, sBuffer.length());
+            return result.isSuccess();
+        }
+        if (createId.equals(timeoutInfo.getSecondKey())) {
+            result.setFailResult(TErrCodeConstants.HB_NO_NODE,
+                    sBuffer.append("Invalid node block id:").append(nodeId)
+                            .append(", you have to append node first!").toString());
+            sBuffer.delete(0, sBuffer.length());
+            return result.isSuccess();
         }
         timeoutInfo.updTimeoutTime(this.brokerTimeoutDlt);
+        result.setSuccResult(null);
+        return result.isSuccess();
     }
 
     /**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index 71b62d5..576b905 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -36,7 +36,7 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.MasterConfig;
 import org.apache.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -181,10 +181,10 @@ public class PBParameterUtils {
     }
 
     public static ParamCheckResult checkConsumerInputInfo(ConsumerInfo inConsumerInfo,
-                                                          final MasterConfig masterConfig,
-                                                          final MetaDataManager defMetaDataManager,
-                                                          final TopicPSInfoManager topicPSInfoManager,
-                                                          final StringBuilder strBuffer) throws Exception {
+                                                          MasterConfig masterConfig,
+                                                          MetaDataManager defMetaDataManager,
+                                                          BrokerRunManager brokerRunManager,
+                                                          StringBuilder strBuffer) throws Exception {
         ParamCheckResult retResult = new ParamCheckResult();
         if (!inConsumerInfo.isRequireBound()) {
             retResult.setCheckData(inConsumerInfo);
@@ -217,7 +217,8 @@ public class PBParameterUtils {
         int allowRate = (offsetResetGroupEntity != null
                 && offsetResetGroupEntity.getAllowedBrokerClientRate() > 0)
                 ? offsetResetGroupEntity.getAllowedBrokerClientRate() : masterConfig.getMaxGroupBrokerConsumeRate();
-        int maxBrokerCount = topicPSInfoManager.getTopicMaxBrokerCount(inConsumerInfo.getTopicSet());
+        int maxBrokerCount =
+                brokerRunManager.getSubTopicMaxBrokerCount(inConsumerInfo.getTopicSet());
         int curBClientRate = (int) Math.floor(maxBrokerCount / inConsumerInfo.getSourceCount());
         if (curBClientRate > allowRate) {
             int minClientCnt = (int) (maxBrokerCount / allowRate);
@@ -412,7 +413,7 @@ public class PBParameterUtils {
         }
         String tmpValue = brokerId.trim();
         try {
-            Integer.parseInt(tmpValue);
+            retResult.setCheckData(Integer.parseInt(tmpValue));
         } catch (Throwable e) {
             retResult.setCheckResult(false,
                     TErrCodeConstants.BAD_REQUEST,
@@ -420,7 +421,6 @@ public class PBParameterUtils {
             strBuffer.delete(0, strBuffer.length());
             return retResult;
         }
-        retResult.setCheckData(tmpValue);
         return retResult;
     }
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 876fcca..db93e04 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -48,7 +48,8 @@ import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
 import org.apache.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+
+
 
 
 public class WebParameterUtils {
@@ -1169,21 +1170,6 @@ public class WebParameterUtils {
         return true;
     }
 
-    public static boolean checkBrokerInOfflining(int brokerId,
-                                                 int manageStatus,
-                                                 MetaDataManager metaManager) {
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                metaManager.getBrokerRunSyncStatusInfo(brokerId);
-        if ((brokerSyncStatusInfo != null)
-                && (brokerSyncStatusInfo.isBrokerRegister())) {
-            if ((manageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE)
-                    && (brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     /**
      * check the filter conditions and get them
      *
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index c8f99e4..c0ae34a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -46,7 +46,6 @@ import org.apache.tubemq.corebase.cluster.NodeAddrInfo;
 import org.apache.tubemq.corebase.cluster.Partition;
 import org.apache.tubemq.corebase.cluster.ProducerInfo;
 import org.apache.tubemq.corebase.cluster.SubscribeInfo;
-import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.config.TLSConfig;
 import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
 import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestB2M;
@@ -82,8 +81,6 @@ import org.apache.tubemq.corerpc.RpcServiceFactory;
 import org.apache.tubemq.corerpc.exception.StandbyException;
 import org.apache.tubemq.corerpc.service.MasterService;
 import org.apache.tubemq.server.Stoppable;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
 import org.apache.tubemq.server.common.aaaserver.CertificateMasterHandler;
 import org.apache.tubemq.server.common.aaaserver.CertifiedResult;
 import org.apache.tubemq.server.common.aaaserver.SimpleCertificateMasterHandler;
@@ -95,7 +92,6 @@ import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.tubemq.server.common.offsetstorage.ZkOffsetStorage;
 import org.apache.tubemq.server.common.paramcheck.PBParameterUtils;
 import org.apache.tubemq.server.common.paramcheck.ParamCheckResult;
-import org.apache.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.tubemq.server.common.utils.HasThread;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.RowLock;
@@ -106,8 +102,9 @@ import org.apache.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerEventManager;
@@ -129,10 +126,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             currentSubInfo = new ConcurrentHashMap<>();
     private final RpcServiceFactory rpcServiceFactory =     //rpc service factory
             new RpcServiceFactory();
+    private final MetaDataManager defMetaDataManager;      // meta data manager
+    private final BrokerRunManager brokerRunManager;       // broker run status manager
     private final ConsumerEventManager consumerEventManager;    //consumer event manager
     private final TopicPSInfoManager topicPSInfoManager;        //topic publish/subscribe info manager
     private final ExecutorService executor;
-    private final BrokerInfoHolder brokerHolder;                //broker holder
     private final ProducerInfoHolder producerHolder;            //producer holder
     private final ConsumerInfoHolder consumerHolder;            //consumer holder
     private final RowLock masterRowLock;                        //lock
@@ -143,7 +141,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private final HeartbeatManager heartbeatManager;            //heartbeat manager
     private final OffsetStorage zkOffsetStorage;                //zookeeper offset manager
     private final ShutdownHook shutdownHook;                    //shutdown hook
-    private final MetaDataManager defMetaDataManager;
     private final CertificateMasterHandler serverAuthHandler;           //server auth handler
     private AtomicBoolean shutdownHooked = new AtomicBoolean(false);
     private AtomicLong idGenerator = new AtomicLong(0);     //id generator
@@ -172,14 +169,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.executor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
         this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
         this.serverAuthHandler = new SimpleCertificateMasterHandler(this.masterConfig);
+        this.heartbeatManager = new HeartbeatManager();
+        this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
+                false, TBaseConstants.META_VALUE_UNDEFINED);
         this.producerHolder = new ProducerInfoHolder();
         this.consumerHolder = new ConsumerInfoHolder();
         this.consumerEventManager = new ConsumerEventManager(consumerHolder);
         this.topicPSInfoManager = new TopicPSInfoManager(this);
         this.loadBalancer = new DefaultLoadBalancer();
-        this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
-                false, TBaseConstants.META_VALUE_UNDEFINED);
-        this.heartbeatManager = new HeartbeatManager();
         heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
                 new TimeoutListener() {
                     @Override
@@ -198,22 +195,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         new ReleaseProducer().run(nodeId);
                     }
                 });
-        heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
-                new TimeoutListener() {
-                    @Override
-                    public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) throws Exception {
-                        logger.info(new StringBuilder(512).append("[Broker Timeout] ")
-                                .append(nodeId).toString());
-                        new ReleaseBroker().run(nodeId);
-                    }
-                });
-
-        this.defMetaDataManager = new MetaDataManager(masterConfig.getHostName(),
-                masterConfig.getMetaDataPath(), masterConfig.getReplicationConfig());
+        this.defMetaDataManager = new MetaDataManager(this);
+        this.brokerRunManager = new DefBrokerRunManager(this);
         this.defMetaDataManager.start();
-        this.brokerHolder =
-                new BrokerInfoHolder(this.masterConfig.getMaxAutoForbiddenCnt(),
-                        this.defMetaDataManager);
         RpcConfig rpcTcpConfig = new RpcConfig();
         rpcTcpConfig.put(RpcConstants.REQUEST_TIMEOUT,
                 masterConfig.getRpcReadTimeoutMs());
@@ -277,6 +261,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return defMetaDataManager;
     }
 
+    public HeartbeatManager getHeartbeatManager() {
+        return heartbeatManager;
+    }
+
+    public BrokerRunManager getBrokerRunManager() {
+        return brokerRunManager;
+    }
+
     /**
      * Producer register request to master
      *
@@ -342,8 +334,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         heartbeatManager.regProducerNode(producerId);
         producerHolder.setProducerInfo(producerId,
                 new HashSet<>(transTopicSet), hostName, overtls);
-        builder.setBrokerCheckSum(this.defMetaDataManager.getBrokerInfoCheckSum());
-        builder.addAllBrokerInfos(this.defMetaDataManager.getBrokersMap(overtls).values());
+        Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+                brokerRunManager.getBrokerStaticInfo(overtls);
+        builder.setBrokerCheckSum(brokerStaticInfo.getF0());
+        builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
         builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
         ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
                 buildApprovedClientConfig(request.getAppdConfig());
@@ -433,10 +427,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 transTopicSet, hostName, overtls);
         Map<String, String> availTopicPartitions = getProducerTopicPartitionInfo(producerId);
         builder.addAllTopicInfos(availTopicPartitions.values());
-        builder.setBrokerCheckSum(defMetaDataManager.getBrokerInfoCheckSum());
         builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
-        if (defMetaDataManager.getBrokerInfoCheckSum() != inBrokerCheckSum) {
-            builder.addAllBrokerInfos(defMetaDataManager.getBrokersMap(overtls).values());
+        Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+                brokerRunManager.getBrokerStaticInfo(overtls);
+        builder.setBrokerCheckSum(brokerStaticInfo.getF0());
+        if (brokerStaticInfo.getF0() != inBrokerCheckSum) {
+            builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
         }
         ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
                 buildApprovedClientConfig(request.getAppdConfig());
@@ -577,7 +573,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         sessionKey, sessionTime, sourceCount, requiredPartMap);
         paramCheckResult =
                 PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
-                        masterConfig, defMetaDataManager, topicPSInfoManager, strBuffer);
+                        masterConfig, defMetaDataManager, brokerRunManager, strBuffer);
         if (!paramCheckResult.result) {
             builder.setErrCode(paramCheckResult.errCode);
             builder.setErrMsg(paramCheckResult.errMsg);
@@ -939,6 +935,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                                                  final String rmtAddress,
                                                  boolean overtls) throws Exception {
         // #lizard forgives
+        ProcessResult result = new ProcessResult();
         final StringBuilder strBuffer = new StringBuilder(512);
         RegisterResponseM2B.Builder builder = RegisterResponseM2B.newBuilder();
         builder.setSuccess(false);
@@ -946,11 +943,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         builder.setStopWrite(false);
         builder.setTakeConfInfo(false);
         // auth
-        CertifiedResult result =
+        CertifiedResult cfResult =
                 serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo());
-        if (!result.result) {
-            builder.setErrCode(result.errCode);
-            builder.setErrMsg(result.errInfo);
+        if (!cfResult.result) {
+            builder.setErrCode(cfResult.errCode);
+            builder.setErrMsg(cfResult.errInfo);
             return builder.build();
         }
         // get clientId and check valid
@@ -964,88 +961,29 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         final String clientId = (String) paramCheckResult.checkData;
         // check authority
         checkNodeStatus(clientId, strBuffer);
-        // check broker validity
-        //
-        BrokerInfo brokerInfo =
-                new BrokerInfo(clientId, request.getEnableTls(),
-                        request.hasTlsPort() ? request.getTlsPort() : TBaseConstants.META_DEFAULT_BROKER_TLS_PORT);
-        BrokerConfEntity brokerConfEntity =
-                defMetaDataManager.getBrokerConfByBrokerId(brokerInfo.getBrokerId());
-        if (brokerConfEntity == null) {
-            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
-            builder.setErrMsg(strBuffer
-                    .append("No broker configure info, please create first! the connecting client id is:")
-                    .append(clientId).toString());
-            return builder.build();
-        }
-        if ((!brokerInfo.getHost().equals(brokerConfEntity.getBrokerIp()))
-                || (brokerInfo.getPort() != brokerConfEntity.getBrokerPort())) {
-            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
-            builder.setErrMsg(strBuffer
-                    .append("Inconsistent broker configure,please confirm first! the connecting client id is:")
-                    .append(clientId).append(", the configure's broker address by brokerId is:")
-                    .append(brokerConfEntity.getBrokerIdAndAddress()).toString());
-            return builder.build();
-        }
-        int confTLSPort = brokerConfEntity.getBrokerTLSPort();
-        if (confTLSPort != brokerInfo.getTlsPort()) {
-            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
-            builder.setErrMsg(strBuffer
-                    .append("Inconsistent TLS configure,please confirm first! the connecting client id is:")
-                    .append(clientId).append(", the configured TLS port is:")
-                    .append(confTLSPort).append(", the broker reported TLS port is ")
-                    .append(brokerInfo.getTlsPort()).toString());
-            return builder.build();
-        }
-        if (brokerConfEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
-            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
-            builder.setErrMsg(strBuffer
-                    .append("Broker's configure not online, please online configure first! " +
-                            "the connecting client id is:")
-                    .append(clientId).toString());
-            return builder.build();
-        }
         // get optional filed
-        boolean needFastStart = false;
+        ClusterSettingEntity defSetting =
+                defMetaDataManager.getClusterDefSetting(false);
         final long reFlowCtrlId = request.hasFlowCheckId()
                 ? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
         final int qryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
-        ConcurrentHashMap<Integer, BrokerSyncStatusInfo> brokerSyncStatusMap =
-                this.defMetaDataManager.getBrokerRunSyncManageMap();
-        // update broker status
-        Map<String, String> brokerTopicSetConfInfo =
-                this.defMetaDataManager.getBrokerTopicStrConfigInfo(brokerConfEntity, strBuffer);
-        List<String> brokerTopicSetConfInfoList =
-                new ArrayList<>(brokerTopicSetConfInfo.size());
-        for (String topicItem : brokerTopicSetConfInfo.values()) {
-            brokerTopicSetConfInfoList.add(topicItem);
-        }
-        BrokerSyncStatusInfo brokerStatusInfo =
-                new BrokerSyncStatusInfo(brokerConfEntity, brokerTopicSetConfInfoList);
-        brokerSyncStatusMap.put(brokerConfEntity.getBrokerId(), brokerStatusInfo);
-        brokerStatusInfo.updateCurrBrokerConfInfo(brokerConfEntity.getManageStatus().getCode(),
-                brokerConfEntity.isConfDataUpdated(), brokerConfEntity.isBrokerLoaded(),
-                brokerConfEntity.getBrokerDefaultConfInfo(), brokerTopicSetConfInfoList, false);
-        if (brokerTopicSetConfInfoList.isEmpty()) {
-            needFastStart = true;
-        }
-        brokerStatusInfo.setFastStart(needFastStart);
-        // set broker report info
-        if (request.getCurBrokerConfId() <= 0) {
-            brokerStatusInfo.setBrokerReportInfo(true, request.getCurBrokerConfId(),
-                    request.getConfCheckSumId(), true, request.getBrokerDefaultConfInfo(),
-                    request.getBrokerTopicSetConfInfoList(), true, request.getBrokerOnline(), overtls);
-        } else {
-            brokerStatusInfo.setBrokerReportInfo(true,
-                    brokerStatusInfo.getLastPushBrokerConfId(),
-                    brokerStatusInfo.getLastPushBrokerCheckSumId(), true,
-                    brokerConfEntity.getBrokerDefaultConfInfo(), brokerTopicSetConfInfoList, true,
-                    request.getBrokerOnline(), overtls);
-        }
-        this.defMetaDataManager.removeBrokerRunTopicInfoMap(brokerInfo.getBrokerId());
-        brokerHolder.setBrokerInfo(brokerInfo.getBrokerId(), brokerInfo);
-        heartbeatManager.regBrokerNode(String.valueOf(brokerInfo.getBrokerId()));
+        int tlsPort = request.hasTlsPort()
+                ? request.getTlsPort() : defSetting.getBrokerTLSPort();
+        // build broker info
+        BrokerInfo brokerInfo =
+                new BrokerInfo(clientId, request.getEnableTls(), tlsPort);
+        // register broker run status info
+        if (!brokerRunManager.brokerRegister2M(clientId, brokerInfo,
+                request.getCurBrokerConfId(), request.getConfCheckSumId(),
+                true, request.getBrokerDefaultConfInfo(),
+                request.getBrokerTopicSetConfInfoList(), request.getBrokerOnline(),
+                overtls, strBuffer, result)) {
+            builder.setErrCode(result.getErrCode());
+            builder.setErrMsg(result.getErrInfo());
+            return builder.build();
+        }
+        // print broker register log
         logger.info(strBuffer.append("[Broker Register] ").append(clientId)
             .append(" report, configureId=").append(request.getCurBrokerConfId())
             .append(",readStatusRpt=").append(request.getReadStatusRpt())
@@ -1056,9 +994,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             .append(",qryPriorityId=").append(qryPriorityId)
             .append(",checksumId=").append(request.getConfCheckSumId()).toString());
         strBuffer.delete(0, strBuffer.length());
-        if (request.getCurBrokerConfId() > 0) {
-            processBrokerReportConfigureInfo(brokerInfo, strBuffer);
-        }
         // response
         builder.setSuccess(true);
         builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -1074,11 +1009,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         enableInfo.setEnableConsumeAuthenticate(masterConfig.isStartConsumeAuthenticate());
         enableInfo.setEnableConsumeAuthorize(masterConfig.isStartConsumeAuthorize());
         builder.setEnableBrokerInfo(enableInfo);
-        builder.setTakeConfInfo(true);
-        builder.setCurBrokerConfId(brokerStatusInfo.getLastPushBrokerConfId());
-        builder.setConfCheckSumId(brokerStatusInfo.getLastPushBrokerCheckSumId());
-        builder.setBrokerDefaultConfInfo(brokerStatusInfo.getLastPushBrokerDefaultConfInfo());
-        builder.addAllBrokerTopicSetConfInfo(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo());
+        brokerRunManager.setRegisterDownConfInfo(brokerInfo.getBrokerId(), strBuffer, builder);
         builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
         ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
                 buildClusterConfig(request.getClsConfig());
@@ -1086,8 +1017,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             builder.setClsConfig(clusterConfigBuilder);
         }
         if (request.hasFlowCheckId()) {
-            ClusterSettingEntity defSetting =
-                    defMetaDataManager.getClusterDefSetting(false);
             builder.setQryPriorityId(defSetting.getQryPriorityId());
             builder.setFlowCheckId(defSetting.getSerialId());
             if (reFlowCtrlId != defSetting.getSerialId()) {
@@ -1098,16 +1027,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 }
             }
         }
-        logger.info(strBuffer.append("[TMaster sync] push broker configure: brokerId = ")
-            .append(brokerStatusInfo.getBrokerId())
-            .append(",configureId=").append(brokerStatusInfo.getLastPushBrokerConfId())
-            .append(",stopWrite=").append(builder.getStopWrite())
-            .append(",stopRead=").append(builder.getStopRead())
-            .append(",checksumId=").append(brokerStatusInfo.getLastPushBrokerCheckSumId())
-            .append(",default configure is ").append(brokerStatusInfo.getLastPushBrokerDefaultConfInfo())
-            .append(",topic configure is ").append(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo())
-            .toString());
-        strBuffer.delete(0, strBuffer.length());
         logger.info(strBuffer.append("[Broker Register] ").append(clientId)
                 .append(", isOverTLS=").append(overtls).toString());
         return builder.build();
@@ -1156,73 +1075,23 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             builder.setErrMsg(paramCheckResult.errMsg);
             return builder.build();
         }
-        final String brokerId = (String) paramCheckResult.checkData;
-        checkNodeStatus(brokerId, strBuffer);
-        BrokerInfo brokerInfo = brokerHolder.getBrokerInfo(Integer.parseInt(brokerId));
-        if (brokerInfo == null) {
-            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
-            builder.setErrMsg(strBuffer
-                    .append("Please register broker first! the connecting client id is:")
-                    .append(brokerId).toString());
-            return builder.build();
-        }
-        BrokerConfEntity brokerConfigEntry =
-                defMetaDataManager.getBrokerConfByBrokerId(brokerInfo.getBrokerId());
-        if (brokerConfigEntry == null) {
-            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
-            builder.setErrMsg(strBuffer
-                    .append("No broker configure info, please create first! the connecting client id is:")
-                    .append(brokerInfo.toString()).toString());
-            return builder.build();
-        }
-        if (brokerConfigEntry.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
-            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
-            builder.setErrMsg(strBuffer
-                    .append("Broker's configure not online, " +
-                            "please online configure first! the connecting client id is:")
-                    .append(brokerInfo.toString()).toString());
-            return builder.build();
-        }
-        ConcurrentHashMap<Integer, BrokerSyncStatusInfo> brokerSyncStatusMap =
-                this.defMetaDataManager.getBrokerRunSyncManageMap();
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                brokerSyncStatusMap.get(brokerInfo.getBrokerId());
-        if (brokerSyncStatusInfo == null) {
-            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
-            builder.setErrMsg(strBuffer
-                    .append("Not found Broker run status info,please register first! the connecting client id is:")
-                    .append(brokerInfo.toString()).toString());
-            return builder.build();
-        }
-        // update heartbeat
-        try {
-            heartbeatManager.updBrokerNode(brokerId);
-        } catch (HeartbeatException e) {
-            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
-            builder.setErrMsg(e.getMessage());
-            return builder.build();
-        }
-
-        // update broker status
-        brokerSyncStatusInfo.setBrokerReportInfo(false, request.getCurBrokerConfId(),
-                request.getConfCheckSumId(), request.getTakeConfInfo(),
-                request.getBrokerDefaultConfInfo(), request.getBrokerTopicSetConfInfoList(),
-                true, request.getBrokerOnline(), overtls);
-        processBrokerReportConfigureInfo(brokerInfo, strBuffer);
-        if (request.getTakeRemovedTopicInfo()) {
-            List<String> removedTopics = request.getRemovedTopicsInfoList();
-            logger.info(strBuffer.append("[Broker Report] receive broker confirmed removed topic list is ")
-                    .append(removedTopics.toString()).toString());
-            strBuffer.delete(0, strBuffer.length());
-            defMetaDataManager.clearRmvedTopicConfInfo(
-                    brokerConfigEntry.getBrokerId(), removedTopics, strBuffer, result);
-        }
-        brokerHolder.updateBrokerReportStatus(brokerInfo.getBrokerId(),
-                request.getReadStatusRpt(), request.getWriteStatusRpt());
+        int brokerId = (int) paramCheckResult.checkData;
         long reFlowCtrlId = request.hasFlowCheckId()
                 ? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
         int qryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
+        checkNodeStatus(String.valueOf(brokerId), strBuffer);
+        if (!brokerRunManager.brokerHeartBeat2M(brokerId,
+                request.getCurBrokerConfId(), request.getConfCheckSumId(),
+                request.getTakeConfInfo(), request.getBrokerDefaultConfInfo(),
+                request.getBrokerTopicSetConfInfoList(), request.getTakeRemovedTopicInfo(),
+                request.getRemovedTopicsInfoList(), request.getReadStatusRpt(),
+                request.getWriteStatusRpt(), request.getBrokerOnline(),
+                strBuffer, result)) {
+            builder.setErrCode(result.getErrCode());
+            builder.setErrMsg(result.getErrInfo());
+            return builder.build();
+        }
         if (request.getTakeConfInfo()) {
             strBuffer.append("[Broker Report] heartbeat report: brokerId=")
                 .append(request.getBrokerId()).append(", configureId=")
@@ -1235,15 +1104,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 .append(",qryPriorityId=").append(qryPriorityId)
                 .append(",brokerOnline=").append(request.getBrokerOnline())
                 .append(",default broker configure is ").append(request.getBrokerDefaultConfInfo())
-                .append(",broker topic configure is ").append(request.getBrokerTopicSetConfInfoList())
-                .append(",current brokerSyncStatusInfo is ");
-            logger.info(brokerSyncStatusInfo.toJsonString(strBuffer, true).toString());
+                .append(",broker topic configure is ").append(request.getBrokerTopicSetConfInfoList());
             strBuffer.delete(0, strBuffer.length());
         }
         // create response
-        builder.setNeedReportData(brokerSyncStatusInfo.needReportData());
-        builder.setCurBrokerConfId(brokerSyncStatusInfo.getLastPushBrokerConfId());
-        builder.setConfCheckSumId(brokerSyncStatusInfo.getLastPushBrokerCheckSumId());
+        brokerRunManager.setHeatBeatDownConfInfo(brokerId, strBuffer, builder);
+        BrokerConfEntity brokerConfEntity =
+                defMetaDataManager.getBrokerConfByBrokerId(brokerId);
+        builder.setTakeRemoveTopicInfo(true);
+        builder.addAllRemoveTopicConfInfo(defMetaDataManager
+                .getBrokerRemovedTopicStrConfigInfo(brokerConfEntity, strBuffer).values());
         builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
         if (request.hasFlowCheckId()) {
             ClusterSettingEntity defSetting =
@@ -1258,36 +1128,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 }
             }
         }
-        brokerHolder.setBrokerHeartBeatReqStatus(brokerInfo.getBrokerId(), builder);
         ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
                 buildClusterConfig(request.getClsConfig());
         if (clusterConfigBuilder != null) {
             builder.setClsConfig(clusterConfigBuilder);
         }
-        builder.setTakeRemoveTopicInfo(true);
-        builder.addAllRemoveTopicConfInfo(defMetaDataManager
-                .getBrokerRemovedTopicStrConfigInfo(brokerConfigEntry, strBuffer).values());
-        if (brokerSyncStatusInfo.needSyncConfDataToBroker()) {
-            builder.setTakeConfInfo(true);
-            builder.setBrokerDefaultConfInfo(brokerSyncStatusInfo
-                    .getLastPushBrokerDefaultConfInfo());
-            builder.addAllBrokerTopicSetConfInfo(brokerSyncStatusInfo
-                    .getLastPushBrokerTopicSetConfInfo());
-            logger.info(strBuffer
-                .append("[Broker Report] heartbeat sync topic config: brokerId=")
-                .append(brokerId).append(", configureId=")
-                .append(brokerSyncStatusInfo.getLastPushBrokerConfId())
-                .append(",set flowCtrlId=").append(builder.getFlowCheckId())
-                .append(",stopWrite=").append(builder.getStopWrite())
-                .append(",stopRead=").append(builder.getStopRead())
-                .append(",qryPriorityId=").append(builder.getQryPriorityId())
-                .append(",checksumId=").append(brokerSyncStatusInfo.getLastPushBrokerCheckSumId())
-                .append(",default configure is ")
-                .append(brokerSyncStatusInfo.getLastPushBrokerDefaultConfInfo())
-                .append(",topic configure is ")
-                .append(brokerSyncStatusInfo.getLastPushBrokerTopicSetConfInfo())
-                .toString());
-        }
         // begin:  deprecated when brokers version equal to current master version
         builder.setAuthorizedInfo(genAuthorizedInfo(null, true));
         // end deprecated
@@ -1311,14 +1156,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     public CloseResponseM2B brokerCloseClientB2M(CloseRequestB2M request,
                                                  final String rmtAddress,
                                                  boolean overtls) throws Throwable {
+        ProcessResult result = new ProcessResult();
         StringBuilder strBuffer = new StringBuilder(512);
         CloseResponseM2B.Builder builder = CloseResponseM2B.newBuilder();
         builder.setSuccess(false);
-        CertifiedResult result =
+        CertifiedResult cfResult =
                 serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo());
-        if (!result.result) {
-            builder.setErrCode(result.errCode);
-            builder.setErrMsg(result.errInfo);
+        if (!cfResult.result) {
+            builder.setErrCode(cfResult.errCode);
+            builder.setErrMsg(cfResult.errInfo);
             return builder.build();
         }
         ParamCheckResult paramCheckResult =
@@ -1328,12 +1174,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             builder.setErrMsg(paramCheckResult.errMsg);
             return builder.build();
         }
-        final String brokerId = (String) paramCheckResult.checkData;
-        checkNodeStatus(brokerId, strBuffer);
-        logger.info(strBuffer.append("[Broker Closed]").append(brokerId)
-                .append(", isOverTLS=").append(overtls).toString());
-        new ReleaseBroker().run(brokerId);
-        heartbeatManager.unRegBrokerNode(request.getBrokerId());
+        final int brokerId = (int) paramCheckResult.checkData;
+        checkNodeStatus(String.valueOf(brokerId), strBuffer);
+        if (!brokerRunManager.brokerClose2M(brokerId, strBuffer, result)) {
+            builder.setErrCode(result.getErrCode());
+            builder.setErrMsg(result.getErrInfo());
+            return builder.build();
+        }
         builder.setSuccess(true);
         builder.setErrCode(TErrCodeConstants.SUCCESS);
         builder.setErrMsg("OK!");
@@ -1359,482 +1206,18 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * @return
      */
     private Map<String, String> getProducerTopicPartitionInfo(String producerId) {
-        Map<String, String> topicPartStrMap = new HashMap<>();
-        ProducerInfo producerInfo = producerHolder.getProducerInfo(producerId);
+        ProducerInfo producerInfo =
+                producerHolder.getProducerInfo(producerId);
         if (producerInfo == null) {
-            return topicPartStrMap;
+            return new HashMap<>();
         }
         Set<String> producerInfoTopicSet =
                 producerInfo.getTopicSet();
         if ((producerInfoTopicSet == null)
                 || (producerInfoTopicSet.isEmpty())) {
-            return topicPartStrMap;
-        }
-        Map<String, StringBuilder> topicPartStrBuilderMap =
-                new HashMap<>();
-        for (String topic : producerInfoTopicSet) {
-            if (topic == null) {
-                continue;
-            }
-            ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
-                    topicPSInfoManager.getBrokerPubInfo(topic);
-            if (topicInfoMap == null) {
-                continue;
-            }
-            for (Map.Entry<BrokerInfo, TopicInfo> entry : topicInfoMap.entrySet()) {
-                if (entry.getKey() == null || entry.getValue() == null) {
-                    continue;
-                }
-                if (entry.getValue().isAcceptPublish()) {
-                    StringBuilder tmpValue = topicPartStrBuilderMap.get(topic);
-                    if (tmpValue == null) {
-                        StringBuilder strBuffer =
-                                new StringBuilder(512).append(topic)
-                                        .append(TokenConstants.SEGMENT_SEP)
-                                        .append(entry.getValue().getSimpleValue());
-                        topicPartStrBuilderMap.put(topic, strBuffer);
-                    } else {
-                        tmpValue.append(TokenConstants.ARRAY_SEP)
-                                .append(entry.getValue().getSimpleValue());
-                    }
-                }
-            }
-        }
-        for (Map.Entry<String, StringBuilder> entry : topicPartStrBuilderMap.entrySet()) {
-            if (entry.getValue() != null) {
-                topicPartStrMap.put(entry.getKey(), entry.getValue().toString());
-            }
-        }
-        topicPartStrBuilderMap.clear();
-        return topicPartStrMap;
-    }
-
-    /**
-     * Update topics
-     *
-     * @param brokerInfo
-     * @param strBuffer
-     * @param curTopicInfoMap
-     * @param newTopicInfoMap
-     * @param requirePartUpdate
-     * @param requireAcceptPublish
-     * @param requireAcceptSubscribe
-     */
-    private void updateTopics(BrokerInfo brokerInfo, final StringBuilder strBuffer,
-                              Map<String/* topicName */, TopicInfo> curTopicInfoMap,
-                              Map<String/* topicName */, TopicInfo> newTopicInfoMap,
-                              boolean requirePartUpdate, boolean requireAcceptPublish,
-                              boolean requireAcceptSubscribe) {
-        List<TopicInfo> needAddTopicList = new ArrayList<>();
-        for (Map.Entry<String, TopicInfo> entry : newTopicInfoMap.entrySet()) {
-            TopicInfo newTopicInfo = entry.getValue();
-            TopicInfo oldTopicInfo = null;
-            if (curTopicInfoMap != null) {
-                oldTopicInfo = curTopicInfoMap.get(entry.getKey());
-            }
-            if (oldTopicInfo == null
-                    || oldTopicInfo.getPartitionNum() != newTopicInfo.getPartitionNum()
-                    || oldTopicInfo.getTopicStoreNum() != newTopicInfo.getTopicStoreNum()
-                    || oldTopicInfo.isAcceptPublish() != newTopicInfo.isAcceptPublish()
-                    || oldTopicInfo.isAcceptSubscribe() != newTopicInfo.isAcceptSubscribe()) {
-                if (requirePartUpdate) {
-                    if (!requireAcceptPublish) {
-                        newTopicInfo.setAcceptPublish(false);
-                    }
-                    if (!requireAcceptSubscribe) {
-                        newTopicInfo.setAcceptSubscribe(false);
-                    }
-                }
-                needAddTopicList.add(newTopicInfo);
-            }
-        }
-        updateTopicsInternal(brokerInfo, needAddTopicList, EventType.CONNECT);
-        logger.info(strBuffer.append("[addedTopicConfigMap] broker:")
-                .append(brokerInfo.toString()).append(" add topicInfo list:")
-                .append(needAddTopicList).toString());
-        strBuffer.delete(0, strBuffer.length());
-    }
-
-    /**
-     * Delete topics
-     *
-     * @param brokerInfo
-     * @param strBuffer
-     * @param curTopicInfoMap
-     * @param newTopicInfoMap
-     */
-    private void deleteTopics(BrokerInfo brokerInfo, final StringBuilder strBuffer,
-                              Map<String/* topicName */, TopicInfo> curTopicInfoMap,
-                              Map<String/* topicName */, TopicInfo> newTopicInfoMap) {
-        List<TopicInfo> needRmvTopicList = new ArrayList<>();
-        if (curTopicInfoMap != null) {
-            for (Map.Entry<String, TopicInfo> entry : curTopicInfoMap.entrySet()) {
-                if (newTopicInfoMap.get(entry.getKey()) == null) {
-                    needRmvTopicList.add(entry.getValue());
-                }
-            }
-        }
-        updateTopicsInternal(brokerInfo, needRmvTopicList, EventType.DISCONNECT);
-        logger.info(strBuffer.append("[removedTopicConfigMap] broker:")
-                .append(brokerInfo.toString()).append(" removed topicInfo list:")
-                .append(needRmvTopicList).toString());
-        strBuffer.delete(0, strBuffer.length());
-    }
-
-    /**
-     * Process broker report configure info
-     *
-     * @param brokerInfo
-     * @param strBuffer
-     */
-    private void processBrokerReportConfigureInfo(BrokerInfo brokerInfo,
-                                                  final StringBuilder strBuffer) {
-        ProcessResult result = new ProcessResult();
-        // #lizard forgives
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                this.defMetaDataManager.getBrokerRunSyncStatusInfo(brokerInfo.getBrokerId());
-        if (brokerSyncStatusInfo == null) {
-            logger.error(strBuffer
-                    .append("Fail to find broker run manage configure! broker is ")
-                    .append(brokerInfo.toString()).toString());
-            strBuffer.delete(0, strBuffer.length());
-            return;
-        }
-        boolean requireAcceptPublish = false;
-        boolean requireAcceptSubscribe = false;
-        boolean requirePartUpdate = false;
-        boolean requireSyncClient = false;
-        int brokerManageStatus = brokerSyncStatusInfo.getBrokerManageStatus();
-        int brokerRunStatus = brokerSyncStatusInfo.getBrokerRunStatus();
-        long subStepOpTimeInMills = brokerSyncStatusInfo.getSubStepOpTimeInMills();
-        boolean isBrokerRegister = brokerSyncStatusInfo.isBrokerRegister();
-        boolean isBrokerOnline = brokerSyncStatusInfo.isBrokerOnline();
-        if (!isBrokerRegister) {
-            return;
-        }
-        if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
-            if (isBrokerOnline) {
-                if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
-                    return;
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_REGISTER
-                        || brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_ONLINE) {
-                    brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ);
-                    requireAcceptSubscribe = true;
-                    requireSyncClient = true;
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ) {
-                    if ((brokerSyncStatusInfo.isBrokerConfChanged())
-                            || (!brokerSyncStatusInfo.isBrokerLoaded())) {
-                        long waitTime =
-                                brokerSyncStatusInfo.isFastStart() ? masterConfig.getStepChgWaitPeriodMs()
-                                        : masterConfig.getOnlineOnlyReadToRWPeriodMs();
-                        if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                            brokerSyncStatusInfo
-                                    .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
-                            requireAcceptPublish = true;
-                            requireAcceptSubscribe = true;
-                            requireSyncClient = true;
-                        }
-                    } else {
-                        brokerSyncStatusInfo
-                                .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
-                        requireAcceptPublish = true;
-                        requireAcceptSubscribe = true;
-                        requireSyncClient = true;
-                    }
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE) {
-                    long waitTime =
-                            brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
-                    if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                        brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
-                        brokerSyncStatusInfo.setFastStart(true);
-                        requireAcceptPublish = true;
-                        requireAcceptSubscribe = true;
-                        requireSyncClient = true;
-                        if ((brokerSyncStatusInfo.isBrokerConfChanged())
-                                || (!brokerSyncStatusInfo.isBrokerLoaded())) {
-                            defMetaDataManager.updateBrokerConfChanged(brokerInfo.getBrokerId(),
-                                    false, true, strBuffer, result);
-                        }
-                    }
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_REGISTER) {
-                    brokerSyncStatusInfo
-                            .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_ONLINE);
-                    requireAcceptSubscribe = true;
-                    requireSyncClient = true;
-                    requirePartUpdate = true;
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_ONLINE) {
-                    brokerSyncStatusInfo
-                            .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_PART_ONLY_READ);
-                    requireAcceptSubscribe = true;
-                    requireSyncClient = true;
-                    requirePartUpdate = true;
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_PART_ONLY_READ) {
-                    if ((brokerSyncStatusInfo.isBrokerConfChanged())
-                            || (!brokerSyncStatusInfo.isBrokerLoaded())) {
-                        long waitTime =
-                                brokerSyncStatusInfo.isFastStart()
-                                        ? 0 : masterConfig.getOnlineOnlyReadToRWPeriodMs();
-                        if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                            brokerSyncStatusInfo
-                                    .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
-                            requireAcceptPublish = true;
-                            requireAcceptSubscribe = true;
-                            requireSyncClient = true;
-                            requirePartUpdate = true;
-                        }
-                    } else {
-                        brokerSyncStatusInfo
-                                .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
-                        requireAcceptPublish = true;
-                        requireAcceptSubscribe = true;
-                        requireSyncClient = true;
-                        requirePartUpdate = true;
-                    }
-                }
-            } else {
-                if (brokerRunStatus != TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_ONLINE) {
-                    brokerSyncStatusInfo
-                            .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_ONLINE);
-                    requireSyncClient = true;
-                }
-            }
-        } else {
-            if (!isBrokerOnline
-                    || brokerRunStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
-                return;
-            }
-            if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
-                if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ) {
-                    long waitTime =
-                            brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
-                    if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                        brokerSyncStatusInfo
-                                .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE);
-                        requireAcceptSubscribe = true;
-                        requireSyncClient = true;
-                    }
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE) {
-                    long waitTime =
-                            brokerSyncStatusInfo.isFastStart()
-                                    ? masterConfig.getStepChgWaitPeriodMs()
-                                    : masterConfig.getOfflineOnlyReadToRWPeriodMs();
-                    if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                        brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
-                        brokerSyncStatusInfo.setFastStart(true);
-                        requireAcceptSubscribe = true;
-                        requireSyncClient = true;
-                    }
-                }
-            } else if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
-                if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_WRITE) {
-                    long waitTime =
-                            brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
-                    if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                        brokerSyncStatusInfo
-                                .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE);
-                        requireAcceptPublish = true;
-                        requireSyncClient = true;
-                    }
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE) {
-                    long waitTime =
-                            brokerSyncStatusInfo.isFastStart() ? masterConfig.getStepChgWaitPeriodMs()
-                                    : masterConfig.getOfflineOnlyReadToRWPeriodMs();
-                    if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                        brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
-                        brokerSyncStatusInfo.setFastStart(true);
-                        requireAcceptPublish = true;
-                        requireSyncClient = true;
-                    }
-                }
-            } else if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE) {
-                if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE) {
-                    brokerSyncStatusInfo
-                            .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_READ_WRITE);
-                    requireAcceptSubscribe = true;
-                    requireSyncClient = true;
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_READ_WRITE) {
-                    long waitTime =
-                            brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
-                    if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                        brokerSyncStatusInfo
-                                .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE);
-                        requireSyncClient = true;
-                    }
-                } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE) {
-                    long waitTime = brokerSyncStatusInfo.isFastStart()
-                            ? masterConfig.getStepChgWaitPeriodMs()
-                            : masterConfig.getOfflineOnlyReadToRWPeriodMs();
-                    if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
-                        brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
-                        brokerSyncStatusInfo.setFastStart(true);
-                        requireSyncClient = true;
-                    }
-                }
-            }
-        }
-
-        if (requireSyncClient) {
-            updateTopicInfoToClient(brokerInfo, requirePartUpdate,
-                    requireAcceptPublish, requireAcceptSubscribe, strBuffer);
-        }
-    }
-
-    /**
-     * Update topic info to client
-     *
-     * @param brokerInfo
-     * @param requirePartUpdate
-     * @param requireAcceptPublish
-     * @param requireAcceptSubscribe
-     * @param strBuffer
-     */
-    private void updateTopicInfoToClient(BrokerInfo brokerInfo,
-                                         boolean requirePartUpdate,
-                                         boolean requireAcceptPublish,
-                                         boolean requireAcceptSubscribe,
-                                         final StringBuilder strBuffer) {
-        // #lizard forgives
-        // check broker status
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                this.defMetaDataManager.getBrokerRunSyncStatusInfo(brokerInfo.getBrokerId());
-        if (brokerSyncStatusInfo == null) {
-            logger.error(strBuffer
-                    .append("Fail to find broker run manage configure, not update topic info! broker is ")
-                    .append(brokerInfo.toString()).toString());
-            strBuffer.delete(0, strBuffer.length());
-            return;
-        }
-        // get broker config and then generate topic status record
-        String brokerDefaultConfInfo = brokerSyncStatusInfo.getReportedBrokerDefaultConfInfo();
-        int brokerManageStatusId = brokerSyncStatusInfo.getBrokerManageStatus();
-        if (TStringUtils.isBlank(brokerDefaultConfInfo)) {
-            return;
-        }
-        // get broker status and topic default config
-        boolean acceptPublish = false;
-        boolean acceptSubscribe = false;
-        if (brokerManageStatusId >= TStatusConstants.STATUS_MANAGE_ONLINE) {
-            if (brokerManageStatusId == TStatusConstants.STATUS_MANAGE_ONLINE) {
-                acceptPublish = true;
-                acceptSubscribe = true;
-            } else if (brokerManageStatusId == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
-                acceptSubscribe = true;
-            } else if (brokerManageStatusId == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
-                acceptPublish = true;
-            }
-        }
-        List<String> brokerTopicSetConfInfo =
-                brokerSyncStatusInfo.getReportedBrokerTopicSetConfInfo();
-        String[] brokerDefaultConfInfoArr = brokerDefaultConfInfo.split(TokenConstants.ATTR_SEP);
-        int numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
-        boolean cfgAcceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
-        boolean cfgAcceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
-        int numTopicStores = 1;
-        if (brokerDefaultConfInfoArr.length > 7) {
-            if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[7])) {
-                numTopicStores = Integer.parseInt(brokerDefaultConfInfoArr[7]);
-            }
-        }
-        int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
-        if (brokerDefaultConfInfoArr.length > 8) {
-            if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[8])) {
-                unFlushDataHold = Integer.parseInt(brokerDefaultConfInfoArr[8]);
-            }
-        }
-        ConcurrentHashMap<String/* topic */, TopicInfo> newTopicInfoMap =
-                new ConcurrentHashMap<>();
-        // according to broker status and default config, topic config, make up current status record
-        for (String strTopicConfInfo : brokerTopicSetConfInfo) {
-            if (TStringUtils.isBlank(strTopicConfInfo)) {
-                continue;
-            }
-            String[] topicConfInfoArr =
-                    strTopicConfInfo.split(TokenConstants.ATTR_SEP);
-            final String tmpTopic = topicConfInfoArr[0];
-            int tmpPartNum = numPartitions;
-            if (!TStringUtils.isBlank(topicConfInfoArr[1])) {
-                tmpPartNum = Integer.parseInt(topicConfInfoArr[1]);
-            }
-            boolean tmpAcceptPublish = cfgAcceptPublish;
-            if (!TStringUtils.isBlank(topicConfInfoArr[2])) {
-                tmpAcceptPublish = Boolean.parseBoolean(topicConfInfoArr[2]);
-            }
-            if (!acceptPublish) {
-                tmpAcceptPublish = acceptPublish;
-            } else {
-                if (!requirePartUpdate) {
-                    if (!requireAcceptPublish) {
-                        tmpAcceptPublish = false;
-                    }
-                }
-            }
-            int tmpNumTopicStores = numTopicStores;
-            if (!TStringUtils.isBlank(topicConfInfoArr[8])) {
-                tmpNumTopicStores = Integer.parseInt(topicConfInfoArr[8]);
-                tmpNumTopicStores = tmpNumTopicStores > 0 ? tmpNumTopicStores : numTopicStores;
-            }
-            boolean tmpAcceptSubscribe = cfgAcceptSubscribe;
-            if (!TStringUtils.isBlank(topicConfInfoArr[3])) {
-                tmpAcceptSubscribe = Boolean.parseBoolean(topicConfInfoArr[3]);
-            }
-            if (!acceptSubscribe) {
-                tmpAcceptSubscribe = acceptSubscribe;
-            } else {
-                if (!requirePartUpdate) {
-                    if (!requireAcceptSubscribe) {
-                        tmpAcceptSubscribe = false;
-                    }
-                }
-            }
-            newTopicInfoMap.put(tmpTopic, new TopicInfo(brokerInfo, tmpTopic,
-                    tmpPartNum, tmpNumTopicStores, tmpAcceptPublish, tmpAcceptSubscribe));
-        }
-
-        ConcurrentHashMap<String/* topicName */, TopicInfo> oldTopicInfoMap =
-                defMetaDataManager.getBrokerRunTopicInfoMap(brokerInfo.getBrokerId());
-        deleteTopics(brokerInfo, strBuffer, oldTopicInfoMap, newTopicInfoMap);
-        updateTopics(brokerInfo, strBuffer, oldTopicInfoMap, newTopicInfoMap,
-                requirePartUpdate, requireAcceptPublish, requireAcceptSubscribe);
-        defMetaDataManager.updateBrokerRunTopicInfoMap(brokerInfo.getBrokerId(), newTopicInfoMap);
-    }
-
-    /**
-     * Update topic internal
-     *
-     * @param broker
-     * @param topicList
-     * @param type
-     */
-    private void updateTopicsInternal(BrokerInfo broker,
-                                      List<TopicInfo> topicList,
-                                      EventType type) {
-        List<TopicInfo> cloneTopicList = new ArrayList<>();
-        for (TopicInfo topicInfo : topicList) {
-            cloneTopicList.add(topicInfo.clone());
-        }
-        for (TopicInfo topicInfo : cloneTopicList) {
-            Integer lid = null;
-            try {
-                lid = this.masterRowLock.getLock(null, StringUtils.getBytesUtf8(topicInfo.getTopic()), true);
-                ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
-                        topicPSInfoManager.getBrokerPubInfo(topicInfo.getTopic());
-                if (topicInfoMap == null) {
-                    topicInfoMap = new ConcurrentHashMap<>();
-                    topicPSInfoManager.setBrokerPubInfo(topicInfo.getTopic(), topicInfoMap);
-                }
-                if (EventType.CONNECT == type) {
-                    topicInfoMap.put(broker, topicInfo);
-                } else {
-                    topicInfoMap.remove(broker);
-                }
-            } catch (IOException e) {
-                logger.error("Get lock error!", e);
-            } finally {
-                if (lid != null) {
-                    this.masterRowLock.releaseRowLock(lid);
-                }
-            }
+            return new HashMap<>();
         }
+        return brokerRunManager.getPubBrokerAcceptPubPartInfo(producerInfoTopicSet);
     }
 
     @Override
@@ -1945,13 +1328,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         // choose different load balance strategy
         if (isFirstReb) {
             finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
-                    topicPSInfoManager, groups, defMetaDataManager,
+                    brokerRunManager, groups, defMetaDataManager,
                     masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
         } else {
             finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
-                    consumerHolder, brokerHolder, topicPSInfoManager, groups,
-                    defMetaDataManager, masterConfig.getMaxGroupBrokerConsumeRate(),
-                    strBuffer);
+                    consumerHolder, brokerRunManager, groups, defMetaDataManager,
+                    masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
         }
         // allocate partitions to consumers
         for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
@@ -2079,11 +1461,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         // choose different load balance strategy
         if (isFirstReb) {
             finalSubInfoMap =  this.loadBalancer.resetBukAssign(consumerHolder,
-                    topicPSInfoManager, groups, this.zkOffsetStorage,
+                    brokerRunManager, groups, this.zkOffsetStorage,
                     this.defMetaDataManager, strBuffer);
         } else {
             finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo,
-                    consumerHolder, topicPSInfoManager, groups, this.zkOffsetStorage,
+                    consumerHolder, brokerRunManager, groups, this.zkOffsetStorage,
                     this.defMetaDataManager, strBuffer);
         }
         // filter
@@ -2419,8 +1801,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return topicPSInfoManager;
     }
 
-    public BrokerInfoHolder getBrokerHolder() {
-        return brokerHolder;
+    public BrokerAbnHolder getBrokerAbnHolder() {
+        return brokerRunManager.getBrokerAbnHolder();
     }
 
     public ProducerInfoHolder getProducerHolder() {
@@ -2497,22 +1879,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
     }
 
-    private class ReleaseBroker extends AbstractReleaseRunner {
-        @Override
-        void run(String arg) {
-            int brokerId = Integer.parseInt(arg);
-            BrokerInfo broker = brokerHolder.removeBroker(brokerId);
-            if (broker != null) {
-                List<TopicInfo> topicInfoList =
-                        topicPSInfoManager.getBrokerPubInfoList(broker);
-                if (topicInfoList != null) {
-                    updateTopicsInternal(broker, topicInfoList, EventType.DISCONNECT);
-                }
-                defMetaDataManager.resetBrokerReportInfo(broker.getBrokerId());
-            }
-        }
-    }
-
     private class ReleaseProducer extends AbstractReleaseRunner {
         @Override
         void run(String clientId) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
index dd39f7a..4f67e84 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -38,8 +38,7 @@ import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
 import org.apache.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
@@ -47,6 +46,8 @@ import org.apache.tubemq.server.master.nodemanage.nodeconsumer.RebProcessInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
 /* Load balance class for server side load balance, (partition size) mod (consumer size) */
 public class DefaultLoadBalancer implements LoadBalancer {
     private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class);
@@ -61,8 +62,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
      *
      * @param clusterState
      * @param consumerHolder
-     * @param brokerHolder
-     * @param topicPSInfoManager
+     * @param brokerRunManager
      * @param groupSet
      * @param metaDataManager
      * @param defAllowBClientRate
@@ -73,8 +73,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
     public Map<String, Map<String, List<Partition>>> balanceCluster(
             Map<String, Map<String, Map<String, Partition>>> clusterState,
             ConsumerInfoHolder consumerHolder,
-            BrokerInfoHolder brokerHolder,
-            TopicPSInfoManager topicPSInfoManager,
+            BrokerRunManager brokerRunManager,
             List<String> groupSet,
             MetaDataManager metaDataManager,
             int defAllowBClientRate,
@@ -129,7 +128,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
                         ? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
                 int allowRate = confAllowBClientRate > 0
                         ? confAllowBClientRate : defAllowBClientRate;
-                int maxBrokerCount = topicPSInfoManager.getTopicMaxBrokerCount(topicSet);
+                int maxBrokerCount =
+                        brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
                 int curBClientRate = (int) Math.floor(maxBrokerCount / newConsumerList.size());
                 if (curBClientRate > allowRate) {
                     int minClientCnt = maxBrokerCount / allowRate;
@@ -160,7 +160,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 }
             }
             List<ConsumerInfo> newConsumerList2 = new ArrayList<>();
-            Map<String, Partition> psMap = topicPSInfoManager.getPartitionMap(topicSet);
+            Map<String, Partition> partMap =
+                    brokerRunManager.getSubBrokerAcceptSubParts(topicSet);
             Map<String, NodeRebInfo> rebProcessInfoMap = consumerBandInfo.getRebalanceMap();
             for (ConsumerInfo consumer : newConsumerList) {
                 Map<String, List<Partition>> partitions = new HashMap<>();
@@ -188,7 +189,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                         Map<String, Partition> partitionMap = entry.getValue();
                         if (partitionMap != null && !partitionMap.isEmpty()) {
                             for (Partition partition : partitionMap.values()) {
-                                Partition curPart = psMap.remove(partition.getPartitionKey());
+                                Partition curPart = partMap.remove(partition.getPartitionKey());
                                 if (curPart != null) {
                                     ps.add(curPart);
                                 }
@@ -198,10 +199,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 }
             }
             // random allocate
-            if (psMap.size() > 0) {
+            if (partMap.size() > 0) {
                 onlineOfflineGroupSet.add(group);
                 if (!newConsumerList2.isEmpty()) {
-                    this.randomAssign(psMap, newConsumerList2,
+                    this.randomAssign(partMap, newConsumerList2,
                             finalSubInfoMap, clusterState, rebProcessInfo.needProcessList);
                 }
             }
@@ -227,7 +228,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
         }
         if (groupsNeedToBalance.size() > 0) {
             finalSubInfoMap =
-                    balance(finalSubInfoMap, consumerHolder, topicPSInfoManager,
+                    balance(finalSubInfoMap, consumerHolder, brokerRunManager,
                             groupsNeedToBalance, clusterState, rejGroupClientINfoMap);
         }
         if (!rejGroupClientINfoMap.isEmpty()) {
@@ -244,7 +245,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
     private Map<String, Map<String, List<Partition>>> balance(
             Map<String, Map<String, List<Partition>>> clusterState,
             ConsumerInfoHolder consumerHolder,
-            TopicPSInfoManager topicPSInfoManager,
+            BrokerRunManager brokerRunManager,
             List<String> groupSet,
             Map<String, Map<String, Map<String, Partition>>> oldClusterState,
             Map<String, RebProcessInfo> rejGroupClientInfoMap) {
@@ -287,7 +288,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
             }
             // sort consumer and partitions, then mod
             Set<String> topics = consumerBandInfo.getTopicSet();
-            Map<String, Partition> psPartMap = topicPSInfoManager.getPartitionMap(topics);
+            Map<String, Partition> psPartMap =
+                    brokerRunManager.getSubBrokerAcceptSubParts(topics);
             int min = psPartMap.size() / consumerList.size();
             int max = psPartMap.size() % consumerList.size() == 0 ? min : min + 1;
             int serverNumToLoadMax = psPartMap.size() % consumerList.size();
@@ -482,7 +484,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
      * Assign consumer partitions
      *
      * @param consumerHolder
-     * @param topicPSInfoManager
+     * @param brokerRunManager
      * @param groupSet
      * @param metaDataManager
      * @param defAllowBClientRate
@@ -492,7 +494,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
     @Override
     public Map<String, Map<String, List<Partition>>> bukAssign(
             ConsumerInfoHolder consumerHolder,
-            TopicPSInfoManager topicPSInfoManager,
+            BrokerRunManager brokerRunManager,
             List<String> groupSet,
             MetaDataManager metaDataManager,
             int defAllowBClientRate,
@@ -529,7 +531,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     ? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
             int allowRate = confAllowBClientRate > 0
                     ? confAllowBClientRate : defAllowBClientRate;
-            int maxBrokerCount = topicPSInfoManager.getTopicMaxBrokerCount(topicSet);
+            int maxBrokerCount =
+                    brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
             int curBClientRate = (int) Math.floor(maxBrokerCount / consumerList.size());
             if (curBClientRate > allowRate) {
                 int minClientCnt = maxBrokerCount / allowRate;
@@ -555,7 +558,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
             // sort and mod
             Collections.sort(consumerList);
             for (String topic : topicSet) {
-                List<Partition> partPubList = topicPSInfoManager.getPartitionList(topic);
+                List<Partition> partPubList =
+                        brokerRunManager.getSubBrokerAcceptSubParts(topic);
                 Collections.sort(partPubList);
                 int partsPerConsumer = partPubList.size() / consumerList.size();
                 int consumersWithExtraPart = partPubList.size() % consumerList.size();
@@ -588,7 +592,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
      * Reset
      *
      * @param consumerHolder
-     * @param topicPSInfoManager
+     * @param brokerRunManager
      * @param groupSet
      * @param zkOffsetStorage
      * @param metaDataManager
@@ -597,11 +601,11 @@ public class DefaultLoadBalancer implements LoadBalancer {
      */
     @Override
     public Map<String, Map<String, Map<String, Partition>>> resetBukAssign(
-            ConsumerInfoHolder consumerHolder, TopicPSInfoManager topicPSInfoManager,
+            ConsumerInfoHolder consumerHolder, BrokerRunManager brokerRunManager,
             List<String> groupSet, OffsetStorage zkOffsetStorage,
             MetaDataManager metaDataManager, final StringBuilder strBuffer) {
         return inReBalanceCluster(false, consumerHolder,
-                topicPSInfoManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
+                brokerRunManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
     }
 
     /**
@@ -609,7 +613,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
      *
      * @param clusterState
      * @param consumerHolder
-     * @param topicPSInfoManager
+     * @param brokerRunManager
      * @param groupSet
      * @param zkOffsetStorage
      * @param metaDataManager
@@ -619,18 +623,18 @@ public class DefaultLoadBalancer implements LoadBalancer {
     @Override
     public Map<String, Map<String, Map<String, Partition>>> resetBalanceCluster(
             Map<String, Map<String, Map<String, Partition>>> clusterState,
-            ConsumerInfoHolder consumerHolder, TopicPSInfoManager topicPSInfoManager,
+            ConsumerInfoHolder consumerHolder, BrokerRunManager brokerRunManager,
             List<String> groupSet, OffsetStorage zkOffsetStorage,
             MetaDataManager metaDataManager, final StringBuilder strBuffer) {
         return inReBalanceCluster(true, consumerHolder,
-                topicPSInfoManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
+                brokerRunManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
     }
 
     // #lizard forgives
     private Map<String, Map<String, Map<String, Partition>>> inReBalanceCluster(
             boolean isResetRebalance,
             ConsumerInfoHolder consumerHolder,
-            TopicPSInfoManager topicPSInfoManager,
+            BrokerRunManager brokerRunManager,
             List<String> groupSet,
             OffsetStorage zkOffsetStorage,
             MetaDataManager metaDataManager,
@@ -687,12 +691,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
             // actual reset offset
             Map<String, Long> partsOffsetMap = consumerBandInfo.getPartOffsetMap();
             List<OffsetStorageInfo> offsetInfoList = new ArrayList<>();
-            Set<Partition> partPubList =
-                    topicPSInfoManager.getPartitions(consumerBandInfo.getTopicSet());
-            Map<String, Partition> partitionMap = new HashMap<>();
-            for (Partition partition : partPubList) {
-                partitionMap.put(partition.getPartitionKey(), partition);
-            }
+            Map<String, Partition> partitionMap =
+                    brokerRunManager.getSubBrokerAcceptSubParts(consumerBandInfo.getTopicSet());
             for (Entry<String, String> entry : partsConsumerMap.entrySet()) {
                 Partition foundPart = partitionMap.get(entry.getKey());
                 if (foundPart != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java
index 98d1204..b17058b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java
@@ -23,8 +23,7 @@ import org.apache.tubemq.corebase.cluster.ConsumerInfo;
 import org.apache.tubemq.corebase.cluster.Partition;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.tubemq.server.master.metamanage.MetaDataManager;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 
 
@@ -33,8 +32,7 @@ public interface LoadBalancer {
     Map<String, Map<String, List<Partition>>> balanceCluster(
             Map<String, Map<String, Map<String, Partition>>> clusterState,
             ConsumerInfoHolder consumerHolder,
-            BrokerInfoHolder brokerHolder,
-            TopicPSInfoManager topicPSInfoManager,
+            BrokerRunManager brokerRunManager,
             List<String> groups,
             MetaDataManager metaDataManager,
             int defAllowBClientRate,
@@ -43,21 +41,21 @@ public interface LoadBalancer {
     Map<String, Map<String, Map<String, Partition>>> resetBalanceCluster(
             Map<String, Map<String, Map<String, Partition>>> clusterState,
             ConsumerInfoHolder consumerHolder,
-            TopicPSInfoManager topicPSInfoManager,
+            BrokerRunManager brokerRunManager,
             List<String> groups,
             OffsetStorage zkOffsetStorage,
             MetaDataManager metaDataManager,
             final StringBuilder sBuilder);
 
     Map<String, Map<String, List<Partition>>> bukAssign(ConsumerInfoHolder consumerHolder,
-                                                        TopicPSInfoManager topicPSInfoManager,
+                                                        BrokerRunManager brokerRunManager,
                                                         List<String> groups,
                                                         MetaDataManager metaDataManager,
                                                         int defAllowBClientRate,
                                                         final StringBuilder sBuilder);
 
     Map<String, Map<String, Map<String, Partition>>> resetBukAssign(ConsumerInfoHolder consumerHolder,
-                                                                    TopicPSInfoManager topicPSInfoManager,
+                                                                    BrokerRunManager brokerRunManager,
                                                                     List<String> groups,
                                                                     OffsetStorage zkOffsetStorage,
                                                                     MetaDataManager metaDataManager,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index a34a13a..3e1ebc8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -25,28 +25,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.Server;
 import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
 import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
 import org.apache.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.tubemq.server.common.statusdef.TopicStatus;
 import org.apache.tubemq.server.common.statusdef.TopicStsChgType;
 import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.MasterConfig;
+import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
 import org.apache.tubemq.server.master.metamanage.metastore.BdbMetaStoreServiceImpl;
 import org.apache.tubemq.server.master.metamanage.metastore.MetaStoreService;
@@ -58,7 +55,8 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResC
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunStatusInfo;
 import org.apache.tubemq.server.master.web.handler.BrokerProcessResult;
 import org.apache.tubemq.server.master.web.handler.GroupProcessResult;
 import org.apache.tubemq.server.master.web.handler.TopicProcessResult;
@@ -72,34 +70,27 @@ public class MetaDataManager implements Server {
 
     private static final Logger logger =
             LoggerFactory.getLogger(MetaDataManager.class);
+    private final TMaster tMaster;
     private static final ClusterSettingEntity defClusterSetting =
             new ClusterSettingEntity().fillDefaultValue();
     private final MasterReplicationConfig replicationConfig;
     private final ScheduledExecutorService scheduledExecutorService;
-    private final ConcurrentHashMap<Integer, String> brokersMap =
-            new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<Integer, String> brokersTLSMap =
-            new ConcurrentHashMap<>();
-
     private final MasterGroupStatus masterGroupStatus = new MasterGroupStatus();
 
-    private ConcurrentHashMap<Integer/* brokerId */, BrokerSyncStatusInfo> brokerRunSyncManageMap =
-            new ConcurrentHashMap<>();
-    private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, TopicInfo>>
-            brokerRunTopicInfoStoreMap = new ConcurrentHashMap<>();
     private volatile boolean isStarted = false;
     private volatile boolean isStopped = false;
     private MetaStoreService metaStoreService;
-    private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
-    private long lastBrokerUpdatedTime = System.currentTimeMillis();
     private long serviceStartTime = System.currentTimeMillis();
 
 
-    public MetaDataManager(String nodeHost, String metaDataPath,
-                           MasterReplicationConfig replicationConfig) {
-        this.replicationConfig = replicationConfig;
+
+    public MetaDataManager(TMaster tMaster) {
+        this.tMaster = tMaster;
+        MasterConfig masterConfig = this.tMaster.getMasterConfig();
+        this.replicationConfig = masterConfig.getReplicationConfig();
         this.metaStoreService =
-                new BdbMetaStoreServiceImpl(nodeHost, metaDataPath, replicationConfig);
+                new BdbMetaStoreServiceImpl(masterConfig.getHostName(),
+                        masterConfig.getMetaDataPath(), this.replicationConfig);
 
         this.scheduledExecutorService =
                 Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@@ -138,40 +129,8 @@ public class MetaDataManager implements Server {
             }
         }, 0, replicationConfig.getRepStatusCheckTimeoutMs(), TimeUnit.MILLISECONDS);
         // initial running data
-        StringBuilder sBuffer = new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
-        Map<Integer, BrokerConfEntity> curBrokerConfInfo =
-                this.metaStoreService.getBrokerConfInfo(null);
-        for (BrokerConfEntity entity : curBrokerConfInfo.values()) {
-            updateBrokerMaps(entity);
-            if (entity.getManageStatus().isApplied()) {
-                boolean needFastStart = false;
-                BrokerSyncStatusInfo brokerSyncStatusInfo =
-                        this.brokerRunSyncManageMap.get(entity.getBrokerId());
-                Map<String, String> brokerTopicSetConfInfo = getBrokerTopicStrConfigInfo(entity, sBuffer);
-                List<String> brokerTopicSetConfInfoList =
-                        new ArrayList<>(brokerTopicSetConfInfo.size());
-                for (String topicItem : brokerTopicSetConfInfo.values()) {
-                    brokerTopicSetConfInfoList.add(topicItem);
-                }
-                if (brokerSyncStatusInfo == null) {
-                    brokerSyncStatusInfo =
-                            new BrokerSyncStatusInfo(entity, brokerTopicSetConfInfoList);
-                    BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
-                            brokerRunSyncManageMap.putIfAbsent(entity.getBrokerId(),
-                                    brokerSyncStatusInfo);
-                    if (tmpBrokerSyncStatusInfo != null) {
-                        brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
-                    }
-                }
-                if (brokerTopicSetConfInfo.isEmpty()) {
-                    needFastStart = true;
-                }
-                brokerSyncStatusInfo.setFastStart(needFastStart);
-                brokerSyncStatusInfo.updateCurrBrokerConfInfo(entity.getManageStatus().getCode(),
-                        entity.isConfDataUpdated(), entity.isBrokerLoaded(),
-                        entity.getBrokerDefaultConfInfo(), brokerTopicSetConfInfoList, false);
-            }
-        }
+        BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+        brokerRunManager.updBrokerStaticInfo(this.metaStoreService.getBrokerConfInfo(null));
         isStarted = true;
         serviceStartTime = System.currentTimeMillis();
         logger.info("BrokerConfManager StoreService Started");
@@ -213,14 +172,6 @@ public class MetaDataManager implements Server {
         }
     }
 
-    public void clearBrokerRunSyncManageData() {
-        if (!this.isStarted
-                && this.isStopped) {
-            return;
-        }
-        this.brokerRunSyncManageMap.clear();
-    }
-
     public InetSocketAddress getMasterAddress() {
         return metaStoreService.getMasterAddress();
     }
@@ -229,20 +180,6 @@ public class MetaDataManager implements Server {
         return metaStoreService.getGroupAddressStrInfo();
     }
 
-
-
-    public long getBrokerInfoCheckSum() {
-        return this.brokerInfoCheckSum.get();
-    }
-
-    public ConcurrentHashMap<Integer, String> getBrokersMap(boolean isOverTLS) {
-        if (isOverTLS) {
-            return brokersTLSMap;
-        } else {
-            return brokersMap;
-        }
-    }
-
     /**
      * Check if consume target is authorization or not
      *
@@ -472,7 +409,9 @@ public class MetaDataManager implements Server {
         if (isAddOp) {
             if (metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId()) == null &&
                     metaStoreService.getBrokerConfByBrokerIp(entity.getBrokerIp()) == null) {
-                metaStoreService.addBrokerConf(entity, sBuffer, result);
+                if (metaStoreService.addBrokerConf(entity, sBuffer, result)) {
+                    this.tMaster.getBrokerRunManager().updBrokerStaticInfo(entity);
+                }
             } else {
                 result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
                         sBuffer.append("Duplicated broker configure record! query index is :")
@@ -495,15 +434,8 @@ public class MetaDataManager implements Server {
                         entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
                         entity.getRegionId(), entity.getGroupId(),
                         entity.getManageStatus(), entity.getTopicProps())) {
-                    metaStoreService.updBrokerConf(newEntity, sBuffer, result);
-                    // update broker configure change status
-                    BrokerSyncStatusInfo brokerSyncStatusInfo =
-                            getBrokerRunSyncStatusInfo(entity.getBrokerId());
-                    if (result.isSuccess()) {
-                        if (brokerSyncStatusInfo != null) {
-                            updateBrokerConfChanged(entity.getBrokerId(),
-                                    true, true, sBuffer, result);
-                        }
+                    if (metaStoreService.updBrokerConf(newEntity, sBuffer, result)) {
+                        triggerBrokerConfDataSync(entity.getBrokerId(), sBuffer, result);
                     }
                 } else {
                     result.setSuccResult(null);
@@ -528,59 +460,6 @@ public class MetaDataManager implements Server {
         return result.isSuccess();
     }
 
-
-    /**
-     * Delete broker configure information
-     *
-     * @param operator  operator
-     * @param brokerId  need deleted broker id
-     * @param strBuffer  the print information string buffer
-     * @param result     the process result return
-     * @return true if success otherwise false
-     */
-    public boolean confDelBrokerConfig(String operator,
-                                       int brokerId,
-                                       StringBuilder strBuffer,
-                                       ProcessResult result) {
-        if (!metaStoreService.checkStoreStatus(true, result)) {
-            return result.isSuccess();
-        }
-        // valid configure status
-        if (metaStoreService.hasConfiguredTopics(brokerId)) {
-            result.setFailResult(DataOpErrCode.DERR_UNCLEANED.getCode(),
-                    "The broker's topic configure uncleaned!");
-            return result.isSuccess();
-        }
-        BrokerConfEntity curEntity =
-                metaStoreService.getBrokerConfByBrokerId(brokerId);
-        if (curEntity == null) {
-            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
-                    "The broker configure not exist!");
-            return result.isSuccess();
-        }
-        if (curEntity.getManageStatus().isOnlineStatus()) {
-            result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
-                    "Broker manage status is online, please offline first!");
-            return result.isSuccess();
-        }
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                this.brokerRunSyncManageMap.get(curEntity.getBrokerId());
-        if (brokerSyncStatusInfo != null) {
-            if (brokerSyncStatusInfo.isBrokerRegister()
-                    && (curEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
-                    && brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED)) {
-                result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
-                        "Broker is processing offline event, please wait and try later!");
-                return result.isSuccess();
-            }
-        }
-        if (metaStoreService.delBrokerConf(operator, brokerId, strBuffer, result)) {
-            this.brokerRunSyncManageMap.remove(brokerId);
-            delBrokerRunData(brokerId);
-        }
-        return result.isSuccess();
-    }
-
     /**
      * Get broker configure information
      *
@@ -654,10 +533,8 @@ public class MetaDataManager implements Server {
                     TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
                     TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
                     TBaseConstants.META_VALUE_UNDEFINED, newMngStatus, null)) {
-                metaStoreService.updBrokerConf(newEntry, sBuffer, result);
-                if (result.isSuccess()) {
-                    triggerBrokerConfDataSync(newEntry,
-                            curEntry.getManageStatus().getCode(), true, sBuffer, result);
+                if (metaStoreService.updBrokerConf(newEntry, sBuffer, result)) {
+                    triggerBrokerConfDataSync(newEntry.getBrokerId(), sBuffer, result);
                 }
             } else {
                 result.setSuccResult(null);
@@ -700,8 +577,7 @@ public class MetaDataManager implements Server {
                 retInfo.add(new BrokerProcessResult(brokerId, "", result));
                 continue;
             }
-            triggerBrokerConfDataSync(curEntry,
-                    curEntry.getManageStatus().getCode(), true, sBuffer, result);
+            triggerBrokerConfDataSync(curEntry.getBrokerId(), sBuffer, result);
             retInfo.add(new BrokerProcessResult(brokerId, curEntry.getBrokerIp(), result));
         }
         return retInfo;
@@ -722,55 +598,61 @@ public class MetaDataManager implements Server {
                                                        StringBuilder sBuffer,
                                                        ProcessResult result) {
         List<BrokerProcessResult> retInfo = new ArrayList<>();
-        Map<Integer, BrokerConfEntity> cfmBrokerMap = new HashMap<>();
-        Map<Integer, BrokerConfEntity> tgtBrokerConfMap =
-                getBrokerConfInfo(brokerIdSet, null, null);
-        // check target broker configure's status
-        for (BrokerConfEntity entity : tgtBrokerConfMap.values()) {
-            if (entity == null) {
+        for (int brokerId : brokerIdSet) {
+            // check broker status
+            if (!isAllowDeleteBrokerConf(brokerId, rsvData, sBuffer, result)) {
+                retInfo.add(new BrokerProcessResult(brokerId, "", result));
                 continue;
             }
-            if (!isMatchDeleteConds(entity.getBrokerId(),
-                    entity.getManageStatus(), rsvData, sBuffer, result)) {
-                retInfo.add(new BrokerProcessResult(
-                        entity.getBrokerId(), entity.getBrokerIp(), result));
-            }
-            cfmBrokerMap.put(entity.getBrokerId(), entity);
-        }
-        if (cfmBrokerMap.isEmpty()) {
-            return retInfo;
-        }
-        // execute delete operation
-        for (BrokerConfEntity entry : cfmBrokerMap.values()) {
-            if (entry == null) {
+            BrokerConfEntity entity =
+                    metaStoreService.getBrokerConfByBrokerId(brokerId);
+            if (entity == null) {
+                result.setSuccResult(null);
+                retInfo.add(new BrokerProcessResult(brokerId, "", result));
                 continue;
             }
-            delBrokerConfig(operator, entry.getBrokerId(), rsvData, sBuffer, result);
-            retInfo.add(new BrokerProcessResult(
-                    entry.getBrokerId(), entry.getBrokerIp(), result));
+            delBrokerConfig(operator, entity.getBrokerId(), rsvData, sBuffer, result);
+            retInfo.add(new BrokerProcessResult(entity.getBrokerId(),
+                    entity.getBrokerIp(), result));
         }
         return retInfo;
     }
 
-    private boolean isMatchDeleteConds(int brokerId, ManageStatus brokerStatus,
-                                       boolean rsvData, StringBuilder sBuffer,
-                                       ProcessResult result) {
-        Map<String, TopicDeployEntity> topicConfigMap =
-                getBrokerTopicConfEntitySet(brokerId);
-        if (topicConfigMap == null || topicConfigMap.isEmpty()) {
+    private boolean isAllowDeleteBrokerConf(int brokerId, boolean rsvData,
+                                            StringBuilder sBuffer, ProcessResult result) {
+        BrokerConfEntity entity =
+                metaStoreService.getBrokerConfByBrokerId(brokerId);
+        if (entity == null) {
             result.setSuccResult(null);
             return result.isSuccess();
         }
-        if (WebParameterUtils.checkBrokerInOfflining(brokerId,
-                brokerStatus.getCode(), this)) {
+        // check broker's manage status
+        if (entity.getManageStatus().isOnlineStatus()) {
+            result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                    "Broker manage status is online, please offline first!");
+            return result.isSuccess();
+        }
+        BrokerRunManager brokerRunManager = tMaster.getBrokerRunManager();
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunManager.getBrokerRunStatusInfo(brokerId);
+        if (runStatusInfo != null
+                && entity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
+                && runStatusInfo.inProcessingStatus()) {
             result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
                     sBuffer.append("Illegal value: the broker is processing offline event by brokerId=")
-                            .append(brokerId).append(", please wait and try later!").toString());
+                            .append(brokerId).append(", please offline first and try later!").toString());
             sBuffer.delete(0, sBuffer.length());
             return result.isSuccess();
         }
+        // check broker's topic configures
+        Map<String, TopicDeployEntity> topiConfMap =
+                metaStoreService.getConfiguredTopicInfo(brokerId);
+        if (topiConfMap == null || topiConfMap.isEmpty()) {
+            result.setSuccResult(null);
+            return result.isSuccess();
+        }
         if (rsvData) {
-            for (Map.Entry<String, TopicDeployEntity> entry : topicConfigMap.entrySet()) {
+            for (Map.Entry<String, TopicDeployEntity> entry : topiConfMap.entrySet()) {
                 if (entry.getValue() == null) {
                     continue;
                 }
@@ -779,8 +661,7 @@ public class MetaDataManager implements Server {
                     result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
                             sBuffer.append("The topic ").append(entry.getKey())
                                     .append("'s acceptPublish and acceptSubscribe parameters")
-                                    .append(" must be false in broker=")
-                                    .append(brokerId)
+                                    .append(" must be false in broker=").append(brokerId)
                                     .append(" before broker delete by reserve data method!").toString());
                     sBuffer.delete(0, sBuffer.length());
                     return result.isSuccess();
@@ -788,8 +669,7 @@ public class MetaDataManager implements Server {
             }
         } else {
             result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
-                    sBuffer.append("Topic configure of broker by brokerId=")
-                            .append(brokerId)
+                    sBuffer.append("Topic configure of broker by brokerId=").append(brokerId)
                             .append(" not deleted, please delete broker's topic configure first!").toString());
             sBuffer.delete(0, sBuffer.length());
             return result.isSuccess();
@@ -798,7 +678,6 @@ public class MetaDataManager implements Server {
         return result.isSuccess();
     }
 
-
     private boolean delBrokerConfig(String operator, int brokerId, boolean rsvData,
                                     StringBuilder strBuffer, ProcessResult result) {
         BrokerConfEntity curEntity =
@@ -828,20 +707,19 @@ public class MetaDataManager implements Server {
                     "Broker manage status is online, please offline first!");
             return result.isSuccess();
         }
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                this.brokerRunSyncManageMap.get(curEntity.getBrokerId());
-        if (brokerSyncStatusInfo != null) {
-            if (brokerSyncStatusInfo.isBrokerRegister()
-                    && (curEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
-                    && brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED)) {
+        BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+        BrokerRunStatusInfo runStatusInfo = brokerRunManager.getBrokerRunStatusInfo(brokerId);
+        if (runStatusInfo != null) {
+            if ((curEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
+                    && runStatusInfo.inProcessingStatus())) {
                 result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
                         "Broker is processing offline event, please wait and try later!");
                 return result.isSuccess();
             }
         }
         if (metaStoreService.delBrokerConf(operator, brokerId, strBuffer, result)) {
-            this.brokerRunSyncManageMap.remove(brokerId);
-            delBrokerRunData(brokerId);
+            brokerRunManager.delBrokerStaticInfo(brokerId);
+            brokerRunManager.releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId());
         }
         return result.isSuccess();
     }
@@ -849,74 +727,28 @@ public class MetaDataManager implements Server {
     /**
      * Manual reload broker config info
      *
-     * @param entity
-     * @param oldManageStatus
-     * @param needFastStart
+     * @param brokerId
+     * @param strBuffer
+     * @param result
      * @return true if success otherwise false
      * @throws Exception
      */
-    public boolean triggerBrokerConfDataSync(BrokerConfEntity entity,
-                                             int oldManageStatus,
-                                             boolean needFastStart,
+    public boolean triggerBrokerConfDataSync(int brokerId,
                                              StringBuilder strBuffer,
                                              ProcessResult result) {
         if (!metaStoreService.checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        BrokerConfEntity curEntity =
-                metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId());
-        if (curEntity == null) {
-            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
-                    "The broker configure not exist!");
-            return result.isSuccess();
-        }
-        String curBrokerConfStr = curEntity.getBrokerDefaultConfInfo();
-        Map<String, String> curBrokerTopicConfStrSet =
-                getBrokerTopicStrConfigInfo(curEntity, strBuffer);
-        List<String> brokerTopicSetConfInfoList =
-                new ArrayList<>(curBrokerTopicConfStrSet.size());
-        for (String topicItem : curBrokerTopicConfStrSet.values()) {
-            brokerTopicSetConfInfoList.add(topicItem);
-        }
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                this.brokerRunSyncManageMap.get(entity.getBrokerId());
-        if (brokerSyncStatusInfo == null) {
-            brokerSyncStatusInfo =
-                    new BrokerSyncStatusInfo(entity, brokerTopicSetConfInfoList);
-            BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
-                    brokerRunSyncManageMap.putIfAbsent(entity.getBrokerId(), brokerSyncStatusInfo);
-            if (tmpBrokerSyncStatusInfo != null) {
-                brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
-            }
-        }
-        if (brokerSyncStatusInfo.isBrokerRegister()
-                && brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED) {
-            result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
-                    strBuffer.append("The broker is processing online event(")
-                    .append(brokerSyncStatusInfo.getBrokerRunStatus())
-                    .append("), please try later! ").toString());
-            strBuffer.delete(0, strBuffer.length());
+        BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunManager.getBrokerRunStatusInfo(brokerId);
+        if (runStatusInfo == null) {
+            result.setSuccResult(null);
             return result.isSuccess();
         }
-        if (brokerSyncStatusInfo.isFastStart()) {
-            brokerSyncStatusInfo.setFastStart(needFastStart);
-        }
-        int curManageStatus = curEntity.getManageStatus().getCode();
-        if (curManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE
-                || curManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE
-                || curManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
-            boolean isOnlineUpdate =
-                    (oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE
-                            || oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE
-                            || oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ);
-            brokerSyncStatusInfo.updateCurrBrokerConfInfo(curManageStatus,
-                    curEntity.isConfDataUpdated(), curEntity.isBrokerLoaded(), curBrokerConfStr,
-                    brokerTopicSetConfInfoList, isOnlineUpdate);
-        } else {
-            brokerSyncStatusInfo.setBrokerOffline();
-        }
-        strBuffer.append("triggered broker syncStatus info is ");
-        logger.info(brokerSyncStatusInfo.toJsonString(strBuffer, false).toString());
+        runStatusInfo.notifyDataChanged();
+        strBuffer.append("[Meta data] triggered broker syncStatus info is ");
+        logger.info(runStatusInfo.toJsonString(strBuffer).toString());
         strBuffer.delete(0, strBuffer.length());
         result.setSuccResult(null);
         return result.isSuccess();
@@ -1021,14 +853,6 @@ public class MetaDataManager implements Server {
     }
 
 
-    public ConcurrentHashMap<Integer, BrokerSyncStatusInfo> getBrokerRunSyncManageMap() {
-        return this.brokerRunSyncManageMap;
-    }
-
-    public BrokerSyncStatusInfo getBrokerRunSyncStatusInfo(int brokerId) {
-        return this.brokerRunSyncManageMap.get(brokerId);
-    }
-
     public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
         return metaStoreService.getBrokerConfByBrokerId(brokerId);
     }
@@ -1041,154 +865,6 @@ public class MetaDataManager implements Server {
         return metaStoreService.getConfiguredTopicInfo(brokerId);
     }
 
-    public ConcurrentHashMap<String/* topicName */, TopicInfo> getBrokerRunTopicInfoMap(
-            final int brokerId) {
-        return this.brokerRunTopicInfoStoreMap.get(brokerId);
-    }
-
-    public void removeBrokerRunTopicInfoMap(final int brokerId) {
-        this.brokerRunTopicInfoStoreMap.remove(brokerId);
-    }
-
-    public void updateBrokerRunTopicInfoMap(final int brokerId,
-                                            ConcurrentHashMap<String, TopicInfo> topicInfoMap) {
-        this.brokerRunTopicInfoStoreMap.put(brokerId, topicInfoMap);
-    }
-
-    public void resetBrokerReportInfo(final int brokerId) {
-        BrokerSyncStatusInfo brokerSyncStatusInfo =
-                brokerRunSyncManageMap.get(brokerId);
-        if (brokerSyncStatusInfo != null) {
-            brokerSyncStatusInfo.resetBrokerReportInfo();
-        }
-        brokerRunTopicInfoStoreMap.remove(brokerId);
-    }
-
-    /**
-     * Update broker config
-     *
-     * @param brokerId
-     * @param isChanged
-     * @param isFasterStart
-     * @return true if success otherwise false
-     */
-    public boolean updateBrokerConfChanged(int brokerId,
-                                           boolean isChanged,
-                                           boolean isFasterStart,
-                                           StringBuilder strBuffer,
-                                           ProcessResult result) {
-        if (!metaStoreService.checkStoreStatus(true, result)) {
-            return result.isSuccess();
-        }
-        BrokerConfEntity curEntity =
-                metaStoreService.getBrokerConfByBrokerId(brokerId);
-        if (curEntity == null) {
-            return false;
-        }
-        // This function needs to be optimized continue
-        if (isChanged) {
-            if (!curEntity.isConfDataUpdated()) {
-                curEntity.setConfDataUpdated();
-                modBrokerConfig(curEntity, strBuffer, result);
-            }
-            if (curEntity.getManageStatus().isApplied()) {
-                BrokerSyncStatusInfo brokerSyncStatusInfo =
-                        brokerRunSyncManageMap.get(curEntity.getBrokerId());
-                if (brokerSyncStatusInfo == null) {
-                    Map<String, String> newBrokerTopicConfStrSet =
-                            getBrokerTopicStrConfigInfo(curEntity, strBuffer);
-                    List<String> brokerTopicSetConfInfoList =
-                            new ArrayList<>(newBrokerTopicConfStrSet.size());
-                    for (String topicItem : newBrokerTopicConfStrSet.values()) {
-                        brokerTopicSetConfInfoList.add(topicItem);
-                    }
-                    brokerSyncStatusInfo =
-                            new BrokerSyncStatusInfo(curEntity, brokerTopicSetConfInfoList);
-                    BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
-                            brokerRunSyncManageMap.putIfAbsent(curEntity.getBrokerId(), brokerSyncStatusInfo);
-                    if (tmpBrokerSyncStatusInfo != null) {
-                        brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
-                    }
-                }
-                if (brokerSyncStatusInfo.isFastStart()) {
-                    brokerSyncStatusInfo.setFastStart(isFasterStart);
-                }
-                if (!brokerSyncStatusInfo.isBrokerConfChanged()) {
-                    brokerSyncStatusInfo.setBrokerConfChanged();
-                }
-            }
-        } else {
-            if (curEntity.isConfDataUpdated()) {
-                curEntity.setBrokerLoaded();
-                modBrokerConfig(curEntity, strBuffer, result);
-            }
-            if (curEntity.getManageStatus().isApplied()) {
-                BrokerSyncStatusInfo brokerSyncStatusInfo =
-                        brokerRunSyncManageMap.get(curEntity.getBrokerId());
-                if (brokerSyncStatusInfo == null) {
-                    Map<String, String> newBrokerTopicConfStrSet =
-                            getBrokerTopicStrConfigInfo(curEntity, strBuffer);
-                    List<String> brokerTopicSetConfInfoList =
-                            new ArrayList<>(newBrokerTopicConfStrSet.size());
-                    for (String topicItem : newBrokerTopicConfStrSet.values()) {
-                        brokerTopicSetConfInfoList.add(topicItem);
-                    }
-                    brokerSyncStatusInfo =
-                            new BrokerSyncStatusInfo(curEntity, brokerTopicSetConfInfoList);
-                    BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
-                            brokerRunSyncManageMap.putIfAbsent(curEntity.getBrokerId(),
-                                    brokerSyncStatusInfo);
-                    if (tmpBrokerSyncStatusInfo != null) {
-                        brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
-                    }
-                }
-                if (brokerSyncStatusInfo.isBrokerConfChanged()) {
-                    brokerSyncStatusInfo.setBrokerLoaded();
-                    brokerSyncStatusInfo.setFastStart(isFasterStart);
-                }
-            }
-        }
-        return true;
-    }
-
-    public void updateBrokerMaps(BrokerConfEntity entity) {
-        if (entity != null) {
-            String brokerReg =
-                    this.brokersMap.putIfAbsent(entity.getBrokerId(),
-                            entity.getSimpleBrokerInfo());
-            String brokerTLSReg =
-                    this.brokersTLSMap.putIfAbsent(entity.getBrokerId(),
-                            entity.getSimpleTLSBrokerInfo());
-            if (brokerReg == null
-                    || brokerTLSReg == null
-                    || !brokerReg.equals(entity.getSimpleBrokerInfo())
-                    || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
-                if (brokerReg != null
-                        && !brokerReg.equals(entity.getSimpleBrokerInfo())) {
-                    this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
-                }
-                if (brokerTLSReg != null
-                        && !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
-                    this.brokersTLSMap.put(entity.getBrokerId(), entity.getSimpleTLSBrokerInfo());
-                }
-                this.lastBrokerUpdatedTime = System.currentTimeMillis();
-                this.brokerInfoCheckSum.set(this.lastBrokerUpdatedTime);
-            }
-        }
-    }
-
-    public void delBrokerRunData(int brokerId) {
-        if (brokerId == TBaseConstants.META_VALUE_UNDEFINED) {
-            return;
-        }
-        String brokerReg = this.brokersMap.remove(brokerId);
-        String brokerTLSReg = this.brokersTLSMap.remove(brokerId);
-        if (brokerReg != null || brokerTLSReg != null) {
-            this.lastBrokerUpdatedTime = System.currentTimeMillis();
-            this.brokerInfoCheckSum.set(this.lastBrokerUpdatedTime);
-        }
-    }
-
     // ////////////////////////////////////////////////////////////////////////////
 
     public TopicProcessResult addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 60b5ac5..ad6afa0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -227,14 +227,14 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
         // get broker configures
         if (qryBrokerKey == null) {
             for (BrokerConfEntity entity :  brokerConfCache.values()) {
-                if (entity != null && entity.isMatched(qryEntity)) {
+                if (entity != null && qryEntity != null && entity.isMatched(qryEntity)) {
                     retMap.put(entity.getBrokerId(), entity);
                 }
             }
         } else {
             for (Integer brokerId : qryBrokerKey) {
                 BrokerConfEntity entity = brokerConfCache.get(brokerId);
-                if (entity != null && entity.isMatched(qryEntity)) {
+                if (entity != null && qryEntity != null && entity.isMatched(qryEntity)) {
                     retMap.put(entity.getBrokerId(), entity);
                 }
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
similarity index 83%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 0040d5b..51f886d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -18,9 +18,7 @@
 package org.apache.tubemq.server.master.nodemanage.nodebroker;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -29,8 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.metamanage.MetaDataManager;
@@ -40,37 +37,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class BrokerInfoHolder {
+public class BrokerAbnHolder {
     private static final Logger logger =
-            LoggerFactory.getLogger(BrokerInfoHolder.class);
-    private final ConcurrentHashMap<Integer/* brokerId */, BrokerInfo> brokerInfoMap =
-            new ConcurrentHashMap<>();
+            LoggerFactory.getLogger(BrokerAbnHolder.class);
     private final ConcurrentHashMap<Integer/* brokerId */, BrokerAbnInfo> brokerAbnormalMap =
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer/* brokerId */, BrokerFbdInfo> brokerForbiddenMap =
             new ConcurrentHashMap<>();
     private final int maxAutoForbiddenCnt;
     private final MetaDataManager metaDataManager;
-    private AtomicInteger brokerTotalCount = new AtomicInteger(0);
     private AtomicInteger brokerForbiddenCount = new AtomicInteger(0);
 
 
-    public BrokerInfoHolder(final int maxAutoForbiddenCnt,
-                            final MetaDataManager metaDataManager) {
+    public BrokerAbnHolder(final int maxAutoForbiddenCnt,
+                           final MetaDataManager metaDataManager) {
         this.maxAutoForbiddenCnt = maxAutoForbiddenCnt;
         this.metaDataManager = metaDataManager;
     }
 
-    public BrokerInfo getBrokerInfo(int brokerId) {
-        return brokerInfoMap.get(brokerId);
-    }
-
-    public void setBrokerInfo(int brokerId, BrokerInfo brokerInfo) {
-        if (brokerInfoMap.put(brokerId, brokerInfo) == null) {
-            this.brokerTotalCount.incrementAndGet();
-        }
-    }
-
     public void updateBrokerReportStatus(int brokerId,
                                          int reportReadStatus,
                                          int reportWriteStatus) {
@@ -156,44 +140,15 @@ public class BrokerInfoHolder {
         }
     }
 
-    public void setBrokerHeartBeatReqStatus(int brokerId,
-                                            ClientMaster.HeartResponseM2B.Builder builder) {
+    public Tuple2<Boolean, Boolean> getBrokerAutoFbdStatus(int brokerId) {
+        Tuple2<Boolean, Boolean> retTuple = new Tuple2<>(false, false);
         BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.get(brokerId);
         if (brokerFbdInfo == null) {
-            builder.setStopWrite(false);
-            builder.setStopRead(false);
-        } else {
-            switch (brokerFbdInfo.newStatus) {
-                case STATUS_MANAGE_ONLINE_NOT_READ: {
-                    builder.setStopWrite(false);
-                    builder.setStopRead(true);
-                }
-                break;
-                case STATUS_MANAGE_ONLINE_NOT_WRITE: {
-                    builder.setStopWrite(true);
-                    builder.setStopRead(false);
-                }
-                break;
-                case STATUS_MANAGE_OFFLINE: {
-                    builder.setStopWrite(true);
-                    builder.setStopRead(true);
-                }
-                break;
-                case STATUS_MANAGE_ONLINE:
-                default: {
-                    builder.setStopWrite(false);
-                    builder.setStopRead(false);
-                }
-            }
+            return retTuple;
         }
-    }
-
-    public Map<Integer, BrokerInfo> getBrokerInfos(Collection<Integer> brokerIds) {
-        HashMap<Integer, BrokerInfo> brokerMap = new HashMap<>();
-        for (Integer brokerId : brokerIds) {
-            brokerMap.put(brokerId, brokerInfoMap.get(brokerId));
-        }
-        return brokerMap;
+        retTuple.setF0AndF1(brokerFbdInfo.newStatus.isAcceptPublish(),
+                brokerFbdInfo.newStatus.isAcceptSubscribe());
+        return retTuple;
     }
 
     /**
@@ -202,25 +157,15 @@ public class BrokerInfoHolder {
      * @param brokerId
      * @return the deleted broker info
      */
-    public BrokerInfo removeBroker(Integer brokerId) {
-        BrokerInfo brokerInfo = brokerInfoMap.remove(brokerId);
+    public void removeBroker(Integer brokerId) {
         brokerAbnormalMap.remove(brokerId);
         BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.remove(brokerId);
-        if (brokerInfo != null) {
-            this.brokerTotalCount.decrementAndGet();
-        }
         if (brokerFbdInfo != null) {
             this.brokerForbiddenCount.decrementAndGet();
         }
-        return brokerInfo;
-    }
-
-    public Map<Integer, BrokerInfo> getBrokerInfoMap() {
-        return brokerInfoMap;
     }
 
     public void clear() {
-        brokerInfoMap.clear();
         brokerForbiddenCount.set(0);
         brokerAbnormalMap.clear();
         brokerForbiddenMap.clear();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
index 3fd9cd4..50a3dc4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
@@ -75,6 +75,11 @@ public class BrokerPSInfoHolder {
         }
     }
 
+    public Tuple2<Boolean, Boolean> getBrokerPubStatus(int brokerId) {
+        return new Tuple2<>(enablePubBrokerIdSet.contains(brokerId),
+                enableSubBrokerIdSet.contains(brokerId));
+    }
+
     /**
      * update broker's subscribe topicInfo configures
      *
@@ -131,11 +136,11 @@ public class BrokerPSInfoHolder {
     }
 
     /**
-     * Gets the list of topic partitions whose subscribe status is enabled
+     * Gets the map of topic partitions whose subscribe status is enabled
      *
      * @param topicSet need query topic set
      */
-    public List<Partition> getAcceptSubParts(Set<String> topicSet) {
+    public Map<String, Partition> getAcceptSubParts(Set<String> topicSet) {
         return subTopicInfoView.getAcceptSubParts(topicSet, enableSubBrokerIdSet);
     }
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
index b37e37a..28f8832 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
@@ -19,35 +19,64 @@ package org.apache.tubemq.server.master.nodemanage.nodebroker;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.Partition;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2B;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2B;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.corebase.utils.Tuple3;
 import org.apache.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 
 
 public interface BrokerRunManager {
 
-    boolean brokerRegister2M(String clientId, boolean enableTls,
-                             int tlsPort, long reportConfigId,
-                             int reportCheckSumId, boolean isTackData,
-                             String repBrokerConfInfo,
-                             List<String> repTopicConfInfo,
-                             boolean isOnline, boolean isOverTLS,
-                             StringBuilder sBuffer, ProcessResult result);
-
-    boolean brokerHeartBeat2M(int brokerId, long reportConfigId,
-                              int reportCheckSumId, boolean isTackData,
-                              String repBrokerConfInfo,
-                              List<String> repTopicConfInfo, boolean isOnline,
+    void updBrokerStaticInfo(Map<Integer, BrokerConfEntity> brokerConfMap);
+
+    void updBrokerStaticInfo(BrokerConfEntity entity);
+
+    Tuple2<Long, Map<Integer, String>> getBrokerStaticInfo(boolean isOverTLS);
+
+    void delBrokerStaticInfo(int brokerId);
+
+    boolean brokerRegister2M(String clientId, BrokerInfo brokerInfo,
+                             long reportConfigId, int reportCheckSumId,
+                             boolean isTackData, String repBrokerConfInfo,
+                             List<String> repTopicConfInfo, boolean isOnline,
+                             boolean isOverTLS, StringBuilder sBuffer,
+                             ProcessResult result);
+
+    boolean brokerHeartBeat2M(int brokerId, long reportConfigId, int reportCheckSumId,
+                              boolean isTackData, String repBrokerConfInfo,
+                              List<String> repTopicConfInfo,
+                              boolean isTackRmvInfo, List<String> removedTopics,
+                              int rptReadStatus, int rptWriteStatus, boolean isOnline,
                               StringBuilder sBuffer, ProcessResult result);
 
-    boolean brokerClose2M(int brokerId);
+    boolean brokerClose2M(int brokerId, StringBuilder sBuffer, ProcessResult result);
+
+    boolean releaseBrokerRunInfo(int brokerId, String blockId);
 
-    boolean brokerTimeout(int brokerId, long bookedId);
+    BrokerRunStatusInfo getBrokerRunStatusInfo(int brokerId);
 
+    Tuple2<Boolean, Boolean> getBrokerPublishStatus(int brokerId);
 
     Tuple3<ManageStatus, String, Map<String, String>> getBrokerMetaConfigInfo(int brokerId);
 
+    void setRegisterDownConfInfo(int brokerId, StringBuilder sBuffer,
+                                 RegisterResponseM2B.Builder builder);
+
+    void setHeatBeatDownConfInfo(int brokerId, StringBuilder sBuffer,
+                                 HeartResponseM2B.Builder builder);
+
+    BrokerInfo getBrokerInfo(int brokerId);
+
+    Map<Integer, BrokerInfo> getBrokerInfoMap(List<Integer> brokerIds);
+
     void updBrokerCsmConfInfo(int brokerId,
                               ManageStatus mngStatus,
                               Map<String, TopicInfo> topicInfoMap);
@@ -56,4 +85,18 @@ public interface BrokerRunManager {
                               ManageStatus mngStatus,
                               Map<String, TopicInfo> topicInfoMap);
 
+    BrokerAbnHolder getBrokerAbnHolder();
+
+    Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet);
+
+    int getSubTopicMaxBrokerCount(Set<String> topicSet);
+
+    Map<String, Partition> getSubBrokerAcceptSubParts(Set<String> topicSet);
+
+    List<Partition> getSubBrokerAcceptSubParts(String topic);
+
+    TopicInfo getPubBrokerTopicInfo(int brokerId, String topic);
+
+    List<TopicInfo> getPubBrokerPushedTopicInfo(int brokerId);
+
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
index 45c5459..80ffd80 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
@@ -36,6 +36,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+
+
 public class BrokerRunStatusInfo {
 
     private static final Logger logger =
@@ -46,7 +48,7 @@ public class BrokerRunStatusInfo {
     private final BrokerRunManager brokerRunManager;
 
     private BrokerInfo brokerInfo;
-    private long createId;
+    private String createId;
     // config change flag
     private AtomicBoolean isConfChanged = new AtomicBoolean(false);
     private AtomicLong confChangeNo =
@@ -60,7 +62,7 @@ public class BrokerRunStatusInfo {
     // broker sync data info
     BrokerSyncData brokerSyncData = new BrokerSyncData();
     // broker status conditions
-    private boolean isOnline = false;     // broker online flag
+    private boolean isOnline = false;       // broker online flag
     private boolean isDoneDataLoad = false;
     private boolean isDoneDataSub = false;
     private boolean isDoneDataPub = false;
@@ -83,7 +85,7 @@ public class BrokerRunStatusInfo {
                                     String brokerConfInfo, Map<String, String> topicConfInfoMap,
                                     boolean isOverTls) {
         resetStatusInfo();
-        this.createId = System.nanoTime();
+        this.createId = String.valueOf(System.nanoTime());
         this.brokerInfo = brokerInfo;
         long curTime = System.currentTimeMillis();
         this.isOverTLS = isOverTls;
@@ -111,20 +113,37 @@ public class BrokerRunStatusInfo {
                 && this.confChangeNo.get() != this.confLoadedNo.get()));
     }
 
-    public long getCreateId() {
+    public String getCreateId() {
         return this.createId;
     }
 
+    public boolean isOverTLS() {
+        return isOverTLS;
+    }
+
+    public BrokerInfo getBrokerInfo() {
+        return brokerInfo;
+    }
+
+    public boolean inProcessingStatus() {
+        return curStepStatus != StepStatus.STEP_STATUS_UNDEFINED;
+    }
+
+    public StepStatus getCurStepStatus() {
+        return curStepStatus;
+    }
+
+    public boolean isOnline() {
+        return isOnline;
+    }
 
     /**
      * Get need sync to broker's data
      *
-     * @param retValue return data container, ** must not null **
-     * @return void
+     * @return return data container
      */
-    public void getNeedSyncData(Tuple4<Long,
-            Integer, String, Map<String, String>> retValue) {
-        brokerSyncData.getBrokerSyncData(retValue);
+    public Tuple4<Long, Integer, String, List<String>> getNeedSyncData() {
+        return brokerSyncData.getBrokerSyncData();
     }
 
     /**
@@ -151,6 +170,31 @@ public class BrokerRunStatusInfo {
         goNextStatus(isRegister, isSynchronized, sBuffer);
     }
 
+    /* Format to json */
+    public StringBuilder toJsonString(StringBuilder sBuffer) {
+        Tuple2<Boolean, Boolean> confStatusTuple = getDataSyncStatus();
+        sBuffer.append("\"BrokerRunStatusInfo\":{\"type\":\"BrokerRunStatusInfo\"")
+                .append(",\"brokerInfo\":\"").append(brokerInfo.getBrokerStrInfo())
+                .append("\",\"createId\":\"").append(createId)
+                .append("\",\"isConfChanged\":").append(confStatusTuple.getF0())
+                .append("\",\"isConfLoaded\":").append(confStatusTuple.getF1())
+                .append(",\"confChangeNo\":").append(confChangeNo.get())
+                .append(",\"curStepStatus\":\"").append(curStepStatus.getDescription())
+                .append("\",\"nextStepOpTimeInMills\":").append(nextStepOpTimeInMills)
+                .append("\",\"confLoadedNo\":").append(confLoadedNo.get())
+                .append(",\"isOnline\":").append(isOnline)
+                .append(",\"isDoneDataLoad\":").append(isDoneDataLoad)
+                .append(",\"isDoneDataSub\":").append(isDoneDataSub)
+                .append(",\"isDoneDataPub\":").append(isDoneDataPub)
+                .append(",\"isDoneDataPub\":").append(isDoneDataPub)
+                .append(",\"isOverTLS\":").append(isOverTLS)
+                .append(",\"lastBrokerSyncTime\":").append(lastBrokerSyncTime)
+                .append(",\"maxConfLoadedTimeInMs\":").append(maxConfLoadedTimeInMs)
+                .append(",\"curConfLoadTimeInMs\":").append(curConfLoadTimeInMs)
+                .append(",\"BrokerSyncData\":");
+        return brokerSyncData.toJsonString(sBuffer);
+    }
+
     private void goNextStatus(boolean isRegister,
                               boolean isSynchronized,
                               StringBuilder sBuffer) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
index 09de48d..b320ed1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
@@ -166,17 +166,20 @@ public class BrokerSyncData {
     /**
      * Get need sync to broker's data
      *
-     * @param retValue return data container, ** must not null **
-     * @return void
+     * @return return data container,
      */
-    public void getBrokerSyncData(Tuple4<Long,
-            Integer, String, Map<String, String>> retValue) {
+    public Tuple4<Long, Integer, String, List<String>> getBrokerSyncData() {
         if (isConfSynchronized()) {
-            retValue.setFieldsValue(syncDownDataConfId.get(), syncDownDataChkSumId,
-                    syncDownBrokerConfInfo, syncDownTopicConfInfoMap);
+            return new Tuple4<>(syncDownDataConfId.get(), syncDownDataChkSumId, null, null);
         } else {
-            retValue.setFieldsValue(syncDownDataConfId.get(),
-                    syncDownDataChkSumId, null, null);
+            List<String> topicInfoList = new ArrayList<>();
+            for (String topicInfo : syncDownTopicConfInfoMap.values()) {
+                if (topicInfo != null) {
+                    topicInfoList.add(topicInfo);
+                }
+            }
+            return new Tuple4<>(syncDownDataConfId.get(), syncDownDataChkSumId,
+                    syncDownBrokerConfInfo, topicInfoList);
         }
     }
 
@@ -214,6 +217,25 @@ public class BrokerSyncData {
                 || !Objects.equals(syncDownTopicConfInfoMap, topicConfInfoMap);
     }
 
+    /* Format to json */
+    public StringBuilder toJsonString(StringBuilder sBuffer) {
+        sBuffer.append("{\"dataPushId\":").append(dataPushId)
+                .append(",\"mngStatus\":\"").append(mngStatus.getDescription())
+                .append("\",\"syncDownDataConfId\":").append(syncDownDataConfId.get())
+                .append(",\"syncDownDataChkSumId\":").append(syncDownDataChkSumId)
+                .append(",\"isStatusChanged\":").append(isStatusChanged)
+                .append(",\"isConfChanged\":").append(isConfChanged)
+                .append(",\"syncDownBrokerConfInfo\":\"").append(syncDownBrokerConfInfo)
+                .append("\",\"syncDownTopicConfInfoMap\":\"").append(syncDownTopicConfInfoMap.toString())
+                .append("\",\"syncUpDataConfId\":").append(syncUpDataConfId)
+                .append(",\"syncUpDataChkSumId\":").append(syncUpDataChkSumId)
+                .append(",\"syncUpBrokerConfInfo\":\"").append(syncUpBrokerConfInfo)
+                .append("\",\"syncUpTopicConfInfos\":\"").append(syncUpTopicConfInfos.toString())
+                .append("\",\"lastDataUpTime\":").append(lastDataUpTime)
+                .append("}");
+        return sBuffer;
+    }
+
     /**
      * parse broker report configure to topicInfo
      *
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
deleted file mode 100644
index d0eedcf..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
+++ /dev/null
@@ -1,843 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.nodemanage.nodebroker;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.utils.CheckSum;
-import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.corebase.utils.Tuple2;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class BrokerSyncStatusInfo {
-    private static final Logger logger =
-            LoggerFactory.getLogger(BrokerSyncStatusInfo.class);
-
-    private boolean isFirstInit = true;
-    private int brokerId = -2;
-    private String brokerIp;        //broker ip
-    private int brokerPort;         //broker port
-    private int brokerTLSPort;      //broker tls port
-
-    /* Broker manager stratus :-2:undefine,1:pending for approval,5:online,7:offline */
-    private int brokerManageStatus = -2;
-
-    /* Broker run status, -2:undefine, 31: online(wait-server), 32:online(readonly), 33:online(read&write)
-    *  51: offline(unreadable&not writeable), 52: offline(wait load balance) */
-    private int brokerRunStatus = -2;
-
-    private long subStepOpTimeInMills = 0;
-    private boolean isBrokerRegister = false;       //broker register flag
-    private boolean isBrokerOnline = false;         //broker online flag
-    private boolean isOverTLS = false;              //enable tls
-    private boolean isBrokerConfChanged = false;     //config change flag
-    private boolean isBrokerLoaded = false;         //broker load status
-    private boolean isFastStart = false;            //enable fast start
-
-    private long lastPushBrokerConfId = -2;
-    private int lastPushBrokerCheckSumId = -2;
-    private long lastDataPushInMills = 0;
-    private String lastPushBrokerDefaultConfInfo;
-    private List<String> lastPushBrokerTopicSetConfInfo =
-            new ArrayList<>();
-
-    private long reportedBrokerConfId = -2;
-    private int reportedBrokerCheckSumId = -2;
-    private long lastDataReportInMills = 0;
-    private String reportedBrokerDefaultConfInfo;
-    private List<String> reportedBrokerTopicSetConfInfo =
-            new ArrayList<>();
-
-    private AtomicLong currBrokerConfId = new AtomicLong(0);
-    private int currBrokerCheckSumId = 0;
-    private String curBrokerDefaultConfInfo;
-    private List<String> curBrokerTopicSetConfInfo =
-            new ArrayList<>();
-
-    private int numPartitions = 1;              //partition number
-    private int numTopicStores = 1;             //store number
-    private int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
-    private int unflushThreshold = 1000;        //flush threshold
-    private int unflushInterval = 10000;        //flush interval
-    private int memCacheMsgSizeInMB = 3;        //memory cache size
-    private int memCacheMsgCntInK = 10;         //memory cache message count
-    private int memCacheFlushIntvl = 20000;     //memory cache flush interval
-    private String deletePolicy = "delete,168h";    //data delete policy
-    private String deleteWhen = "0 0 6,18 * * ?";   //date delete policy execute time
-    private boolean acceptPublish = true;           //accept publish
-    private boolean acceptSubscribe = true;         //accept subscribe
-
-    //Constructor
-    public BrokerSyncStatusInfo(final BdbBrokerConfEntity bdbEntity,
-                                List<String> brokerTopicSetConfInfo) {
-        updateBrokerConfigureInfo(bdbEntity.getBrokerDefaultConfInfo(),
-                brokerTopicSetConfInfo);
-        this.brokerManageStatus = bdbEntity.getManageStatus();
-        this.isBrokerConfChanged = bdbEntity.isConfDataUpdated();
-        this.isBrokerLoaded = bdbEntity.isBrokerLoaded();
-        this.brokerId = bdbEntity.getBrokerId();
-        this.brokerIp = bdbEntity.getBrokerIp();
-        this.brokerPort = bdbEntity.getBrokerPort();
-        this.brokerTLSPort = bdbEntity.getBrokerTLSPort();
-        this.isFastStart = false;
-        if (this.brokerManageStatus > TStatusConstants.STATUS_MANAGE_APPLY) {
-            currBrokerConfId.incrementAndGet();
-        }
-    }
-
-    //Constructor
-    public BrokerSyncStatusInfo(BrokerConfEntity brokerEntity,
-                                List<String> brokerTopicSetConfInfo) {
-        updateBrokerConfigureInfo(brokerEntity.getBrokerDefaultConfInfo(),
-                brokerTopicSetConfInfo);
-        this.brokerManageStatus = brokerEntity.getManageStatus().getCode();
-        this.isBrokerConfChanged = brokerEntity.isConfDataUpdated();
-        this.isBrokerLoaded = brokerEntity.isBrokerLoaded();
-        this.brokerId = brokerEntity.getBrokerId();
-        this.brokerIp = brokerEntity.getBrokerIp();
-        this.brokerPort = brokerEntity.getBrokerPort();
-        this.brokerTLSPort = brokerEntity.getBrokerTLSPort();
-        this.isFastStart = false;
-        if (this.brokerManageStatus > TStatusConstants.STATUS_MANAGE_APPLY) {
-            currBrokerConfId.incrementAndGet();
-        }
-    }
-    /**
-     * Update current broker config info
-     *
-     * @param brokerManageStatus     broker status
-     * @param isBrokerConfChanged
-     * @param isBrokerLoaded
-     * @param brokerDefaultConfInfo  broker default config
-     * @param brokerTopicSetConfInfo topic config
-     * @param isOnlineUpdate
-     */
-    public void updateCurrBrokerConfInfo(int brokerManageStatus, boolean isBrokerConfChanged,
-                                         boolean isBrokerLoaded, String brokerDefaultConfInfo,
-                                         List<String> brokerTopicSetConfInfo,
-                                         boolean isOnlineUpdate) {
-        this.brokerManageStatus = brokerManageStatus;
-        this.currBrokerConfId.incrementAndGet();
-        this.isBrokerConfChanged = isBrokerConfChanged;
-        this.isBrokerLoaded = isBrokerLoaded;
-        updateBrokerConfigureInfo(brokerDefaultConfInfo, brokerTopicSetConfInfo);
-        this.lastPushBrokerConfId = this.currBrokerConfId.get();
-        this.lastPushBrokerCheckSumId = this.currBrokerCheckSumId;
-        this.lastDataPushInMills = System.currentTimeMillis();
-        this.lastPushBrokerDefaultConfInfo = this.curBrokerDefaultConfInfo;
-        this.lastPushBrokerTopicSetConfInfo = this.curBrokerTopicSetConfInfo;
-        switch (this.brokerManageStatus) {
-            case TStatusConstants.STATUS_MANAGE_ONLINE: {
-                this.brokerRunStatus = isOnlineUpdate
-                        ? TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_REGISTER
-                        : TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_REGISTER;
-                break;
-            }
-            case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE: {
-                this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
-                break;
-            }
-            case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ: {
-                this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_WRITE;
-                break;
-            }
-            default: {
-                if (this.isBrokerRegister) {
-                    this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE;
-                }
-            }
-        }
-        this.subStepOpTimeInMills = System.currentTimeMillis();
-    }
-
-    /**
-     * Reset broker report info
-     */
-    public void resetBrokerReportInfo() {
-        this.reportedBrokerConfId = -2;
-        this.reportedBrokerCheckSumId = -2;
-        this.reportedBrokerDefaultConfInfo = "";
-        this.reportedBrokerTopicSetConfInfo =
-                new ArrayList<>();
-        this.isBrokerRegister = false;
-        this.isBrokerOnline = false;
-        this.isFastStart = false;
-        this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_UNDEFINED;
-        this.subStepOpTimeInMills = System.currentTimeMillis();
-    }
-
-    /**
-     * Set broker report info
-     *
-     * @param isRegister
-     * @param reportConfigId
-     * @param reportCheckSumId
-     * @param isTackData
-     * @param reportDefaultConfInfo
-     * @param reportTopicSetConfInfo
-     * @param isBrokerRegister
-     * @param isBrokerOnline
-     * @param isOverTLS
-     */
-    public void setBrokerReportInfo(boolean isRegister, long reportConfigId,
-                                    int reportCheckSumId, boolean isTackData,
-                                    String reportDefaultConfInfo,
-                                    List<String> reportTopicSetConfInfo,
-                                    boolean isBrokerRegister, boolean isBrokerOnline, boolean isOverTLS) {
-        this.reportedBrokerConfId = reportConfigId;
-        this.reportedBrokerCheckSumId = reportCheckSumId;
-        if (isTackData) {
-            this.reportedBrokerDefaultConfInfo = reportDefaultConfInfo;
-            if (reportTopicSetConfInfo == null) {
-                this.reportedBrokerTopicSetConfInfo = new ArrayList<>();
-            } else {
-                this.reportedBrokerTopicSetConfInfo = reportTopicSetConfInfo;
-            }
-            this.lastDataReportInMills = System.currentTimeMillis();
-        }
-        this.isBrokerRegister = isBrokerRegister;
-        this.isBrokerOnline = isBrokerOnline;
-        this.isOverTLS = isOverTLS;
-        if (isRegister) {
-            if (this.isBrokerOnline) {
-                if (this.reportedBrokerConfId <= 0) {
-                    if (this.isBrokerConfChanged
-                            || !this.isBrokerLoaded
-                            || this.reportedBrokerCheckSumId != this.lastPushBrokerCheckSumId
-                            || !isFirstInit) {
-                        return;
-                    }
-                    this.lastPushBrokerConfId = this.reportedBrokerConfId;
-                    this.currBrokerConfId.set(this.lastPushBrokerConfId);
-                    this.lastPushBrokerCheckSumId = this.currBrokerCheckSumId;
-                    this.lastDataPushInMills = System.currentTimeMillis();
-                    this.brokerRunStatus =
-                            TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
-                    this.subStepOpTimeInMills = System.currentTimeMillis();
-                    this.isFirstInit = false;
-                } else {
-                    this.isFirstInit = false;
-                    this.isFastStart = true;
-                    switch (this.brokerManageStatus) {
-                        case TStatusConstants.STATUS_MANAGE_ONLINE: {
-                            this.brokerRunStatus =
-                                    TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
-                            break;
-                        }
-                        case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE: {
-                            this.brokerRunStatus =
-                                    TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
-                            break;
-                        }
-                        case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ: {
-                            this.brokerRunStatus =
-                                    TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_WRITE;
-                            break;
-                        }
-                        default: {
-                            this.brokerRunStatus =
-                                    TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE;
-                        }
-                    }
-                    this.subStepOpTimeInMills = 0;
-                }
-            }
-        }
-    }
-
-    /**
-     * Set broker status to offline
-     */
-    public void setBrokerOffline() {
-        if (this.brokerManageStatus != TStatusConstants.STATUS_MANAGE_OFFLINE) {
-            this.brokerManageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE;
-        }
-        if (this.isBrokerOnline) {
-            this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE;
-        } else if (this.isBrokerRegister) {
-            this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_READ_WRITE;
-        } else {
-            this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE;
-        }
-        this.subStepOpTimeInMills = System.currentTimeMillis();
-    }
-
-    /**
-     * Check if need sync config data to broker
-     *
-     * @return true if need otherwise false
-     */
-    public boolean needSyncConfDataToBroker() {
-        if (this.lastPushBrokerConfId != this.reportedBrokerConfId
-                || this.lastPushBrokerCheckSumId != this.reportedBrokerCheckSumId) {
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * According to last report time and current time to decide if need to report data
-     *
-     * @return true if need report data otherwise false
-     */
-    public boolean needReportData() {
-        if (System.currentTimeMillis() - this.lastDataPushInMills
-                > TServerConstants.CFG_REPORT_DEFAULT_SYNC_DURATION) {
-            this.lastDataPushInMills = System.currentTimeMillis();
-            return true;
-        }
-        return false;
-    }
-
-    public void forceSyncConfDataToBroker() {
-        this.isFastStart = true;
-        this.lastPushBrokerConfId = this.currBrokerConfId.incrementAndGet();
-        this.lastPushBrokerCheckSumId = this.currBrokerCheckSumId;
-        this.lastPushBrokerDefaultConfInfo = this.curBrokerDefaultConfInfo;
-        this.lastPushBrokerTopicSetConfInfo = this.curBrokerTopicSetConfInfo;
-        this.lastDataPushInMills = System.currentTimeMillis();
-    }
-
-    public Long getCurrBrokerConfId() {
-        return currBrokerConfId.get();
-    }
-
-    public int getCurrBrokerCheckSumId() {
-        return currBrokerCheckSumId;
-    }
-
-    public String getCurBrokerDefaultConfInfo() {
-        return curBrokerDefaultConfInfo;
-    }
-
-    public List<String> getCurBrokerTopicSetConfInfo() {
-        return curBrokerTopicSetConfInfo;
-    }
-
-    public long getLastDataReportInMills() {
-        return lastDataReportInMills;
-    }
-
-    public void setLastDataReportInMills(long lastDataReportInMills) {
-        this.lastDataReportInMills = lastDataReportInMills;
-    }
-
-    public boolean isFastStart() {
-        return isFastStart;
-    }
-
-    public void setFastStart(boolean isFastStart) {
-        this.isFastStart = isFastStart;
-    }
-
-    public boolean isBrokerOnline() {
-        return this.isBrokerOnline;
-    }
-
-    public void setBrokerOnline(boolean isBrokerOnline) {
-        this.isBrokerOnline = isBrokerOnline;
-    }
-
-    public boolean isBrokerRegister() {
-        return this.isBrokerRegister;
-    }
-
-    public void setBrokerRunStatus(boolean isBrokerRegister,
-                                   boolean isBrokerOnline) {
-        this.isBrokerRegister = isBrokerRegister;
-        this.isBrokerOnline = isBrokerOnline;
-    }
-
-    public long getReportedBrokerConfId() {
-        return reportedBrokerConfId;
-    }
-
-    public int getNumPartitions() {
-        return numPartitions;
-    }
-
-    public int getUnflushThreshold() {
-        return unflushThreshold;
-    }
-
-    public int getUnflushInterval() {
-        return unflushInterval;
-    }
-
-    public String getDeletePolicy() {
-        return deletePolicy;
-    }
-
-    public String getDeleteWhen() {
-        return deleteWhen;
-    }
-
-    public boolean isAcceptPublish() {
-        return acceptPublish;
-    }
-
-    public boolean isAcceptSubscribe() {
-        return acceptSubscribe;
-    }
-
-    public int getBrokerRunStatus() {
-        return brokerRunStatus;
-    }
-
-    public void setBrokerRunStatus(int brokerRunStatus) {
-        this.brokerRunStatus = brokerRunStatus;
-        this.subStepOpTimeInMills = System.currentTimeMillis();
-    }
-
-    public boolean isOverTLS() {
-        return isOverTLS;
-    }
-
-    public void setOverTLS(boolean overTLS) {
-        isOverTLS = overTLS;
-    }
-
-    public int getBrokerTLSPort() {
-        return brokerTLSPort;
-    }
-
-    public void setBrokerTLSPort(int brokerTLSPort) {
-        this.brokerTLSPort = brokerTLSPort;
-    }
-
-    public long getSubStepOpTimeInMills() {
-        return subStepOpTimeInMills;
-    }
-
-    public int getBrokerId() {
-        return brokerId;
-    }
-
-    public String getBrokerIp() {
-        return brokerIp;
-    }
-
-    public int getBrokerPort() {
-        return brokerPort;
-    }
-
-    public int getBrokerManageStatus() {
-        return brokerManageStatus;
-    }
-
-    private int calculateConfigCrc32Value(final String brokerDefaultConfInfo,
-                                          final List<String> brokerTopicSetConfInfo) {
-        int result = -1;
-        int capacity = 0;
-        Collections.sort(brokerTopicSetConfInfo);
-        capacity += brokerDefaultConfInfo.length();
-        for (String itemStr : brokerTopicSetConfInfo) {
-            capacity += itemStr.length();
-        }
-        capacity *= 2;
-        for (int i = 1; i < 3; i++) {
-            result = inCalcBufferResult(capacity, brokerDefaultConfInfo, brokerTopicSetConfInfo);
-            if (result >= 0) {
-                return result;
-            }
-            capacity *= i + 1;
-        }
-        logger.error("Calc BrokerConfigure Crc error!");
-        return 0;
-    }
-
-    private int inCalcBufferResult(int capacity, final String brokerDefaultConfInfo,
-                                   final List<String> brokerTopicSetConfInfo) {
-        final ByteBuffer buffer = ByteBuffer.allocate(capacity);
-        buffer.put(StringUtils.getBytesUtf8(brokerDefaultConfInfo));
-        for (String itemStr : brokerTopicSetConfInfo) {
-            byte[] itemData = StringUtils.getBytesUtf8(itemStr);
-            if (itemData.length > buffer.remaining()) {
-                return -1;
-            }
-            buffer.put(itemData);
-        }
-        return CheckSum.crc32(buffer.array());
-    }
-
-    /**
-     * Update broker config, field will set to default value if brokerDefaultConfInfo is empty,
-     * else will parse the string value and then set broker config
-     *
-     * @param brokerDefaultConfInfo  a string, field join with ":",
-     * @param brokerTopicSetConfInfo
-     */
-    private void updateBrokerConfigureInfo(String brokerDefaultConfInfo,
-                                           List<String> brokerTopicSetConfInfo) {
-        int crc32CheckSum =
-                calculateConfigCrc32Value(brokerDefaultConfInfo, brokerTopicSetConfInfo);
-        if (crc32CheckSum != this.currBrokerCheckSumId) {
-            this.currBrokerCheckSumId = crc32CheckSum;
-            this.curBrokerTopicSetConfInfo = brokerTopicSetConfInfo;
-            this.curBrokerDefaultConfInfo = brokerDefaultConfInfo;
-            if (TStringUtils.isBlank(brokerDefaultConfInfo)) {
-                this.numPartitions = 1;
-                this.numTopicStores = 1;
-                this.unflushThreshold = 1000;
-                this.unflushInterval = 10000;
-                this.unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
-                this.deletePolicy = "delete,168h";
-                this.deleteWhen = "0 0 6,18 * * ?";
-                this.acceptPublish = true;
-                this.acceptSubscribe = true;
-                this.memCacheFlushIntvl = 20000;
-                this.memCacheMsgCntInK = 10;
-                this.memCacheMsgSizeInMB = 3;
-            } else {
-                String[] brokerDefaultConfInfoArr =
-                        brokerDefaultConfInfo.split(TokenConstants.ATTR_SEP);
-                this.numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
-                this.acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
-                this.acceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
-                this.unflushThreshold = Integer.parseInt(brokerDefaultConfInfoArr[3]);
-                this.unflushInterval = Integer.parseInt(brokerDefaultConfInfoArr[4]);
-                this.deleteWhen = brokerDefaultConfInfoArr[5];
-                this.deletePolicy = brokerDefaultConfInfoArr[6];
-                if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[7])) {
-                    this.numTopicStores = Integer.parseInt(brokerDefaultConfInfoArr[7]);
-                }
-                if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[8])) {
-                    this.unFlushDataHold = Integer.parseInt(brokerDefaultConfInfoArr[8]);
-                }
-                if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[9])) {
-                    this.memCacheMsgSizeInMB = Integer.parseInt(brokerDefaultConfInfoArr[9]);
-                }
-                if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[10])) {
-                    this.memCacheMsgCntInK = Integer.parseInt(brokerDefaultConfInfoArr[10]);
-                }
-                if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[11])) {
-                    this.memCacheFlushIntvl = Integer.parseInt(brokerDefaultConfInfoArr[11]);
-                }
-            }
-        }
-    }
-
-    public long getLastPushBrokerConfId() {
-        return lastPushBrokerConfId;
-    }
-
-    public void setLastPushBrokerConfId(long lastPushBrokerConfId) {
-        this.lastPushBrokerConfId = lastPushBrokerConfId;
-    }
-
-    public int getLastPushBrokerCheckSumId() {
-        return lastPushBrokerCheckSumId;
-    }
-
-    public void setLastPushBrokerCheckSumId(int lastPushBrokerCheckSumId) {
-        this.lastPushBrokerCheckSumId = lastPushBrokerCheckSumId;
-    }
-
-    public long getLastDataPushInMills() {
-        return lastDataPushInMills;
-    }
-
-    public void setLastDataPushInMills(long lastDataPushInMills) {
-        this.lastDataPushInMills = lastDataPushInMills;
-    }
-
-    public String getLastPushBrokerDefaultConfInfo() {
-        return lastPushBrokerDefaultConfInfo;
-    }
-
-    public void setLastPushBrokerDefaultConfInfo(String lastPushBrokerDefaultConfInfo) {
-        this.lastPushBrokerDefaultConfInfo = lastPushBrokerDefaultConfInfo;
-    }
-
-    public List<String> getLastPushBrokerTopicSetConfInfo() {
-        return lastPushBrokerTopicSetConfInfo;
-    }
-
-    public void setLastPushBrokerTopicSetConfInfo(List<String> lastPushBrokerTopicSetConfInfo) {
-        this.lastPushBrokerTopicSetConfInfo = lastPushBrokerTopicSetConfInfo;
-    }
-
-    public String getReportedBrokerDefaultConfInfo() {
-        return reportedBrokerDefaultConfInfo;
-    }
-
-    public void setReportedBrokerDefaultConfInfo(String reportedBrokerDefaultConfInfo) {
-        this.reportedBrokerDefaultConfInfo = reportedBrokerDefaultConfInfo;
-    }
-
-    public List<String> getReportedBrokerTopicSetConfInfo() {
-        return reportedBrokerTopicSetConfInfo;
-    }
-
-    public void setReportedBrokerTopicSetConfInfo(List<String> reportedBrokerTopicSetConfInfo) {
-        this.reportedBrokerTopicSetConfInfo = reportedBrokerTopicSetConfInfo;
-    }
-
-    public boolean isBrokerConfChanged() {
-        return isBrokerConfChanged;
-    }
-
-    public void setBrokerConfChanged() {
-        this.isBrokerConfChanged = true;
-        this.isBrokerLoaded = false;
-    }
-
-    public boolean isBrokerLoaded() {
-        return isBrokerLoaded;
-    }
-
-    public void setBrokerLoaded() {
-        this.isBrokerLoaded = true;
-        this.isBrokerConfChanged = false;
-    }
-
-    // #lizard forgives
-    private StringBuilder getBrokerAndTopicConfJsonInfo(String brokerConfInfo,
-                                                        String brokerJsonKey,
-                                                        List<String> topicConfInfoList,
-                                                        String topicListJsonKey,
-                                                        final StringBuilder strBuffer) {
-        // format config to json
-        strBuffer.append(",\"").append(brokerJsonKey).append("\":");
-        if (TStringUtils.isBlank(brokerConfInfo)) {
-            strBuffer.append("{},\"").append(topicListJsonKey).append("\":[]");
-            return strBuffer;
-        }
-        // broker default metadata
-        String[] brokerDefaultConfInfoArr =
-                brokerConfInfo.split(TokenConstants.ATTR_SEP);
-        final int numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
-        final boolean acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
-        final boolean acceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
-        final int unflushThreshold = Integer.parseInt(brokerDefaultConfInfoArr[3]);
-        final int unflushInterval = Integer.parseInt(brokerDefaultConfInfoArr[4]);
-        final String deleteWhen = brokerDefaultConfInfoArr[5];
-        final String deletePolicy = brokerDefaultConfInfoArr[6];
-        int numTopicStores = 1;
-        int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
-        int memCacheMsgSizeInMB = 3;
-        int memCacheMsgCntInK = 10;
-        int memCacheFlushIntvl = 20000;
-        if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[7])) {
-            numTopicStores =
-                    Integer.parseInt(brokerDefaultConfInfoArr[7]);
-        }
-        if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[8])) {
-            unFlushDataHold =
-                    Integer.parseInt(brokerDefaultConfInfoArr[8]);
-        }
-        if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[9])) {
-            memCacheMsgSizeInMB =
-                    Integer.parseInt(brokerDefaultConfInfoArr[9]);
-        }
-        if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[10])) {
-            memCacheMsgCntInK =
-                    Integer.parseInt(brokerDefaultConfInfoArr[10]);
-        }
-        if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[11])) {
-            memCacheFlushIntvl =
-                    Integer.parseInt(brokerDefaultConfInfoArr[11]);
-        }
-        //format broker config to json
-        strBuffer.append("{\"numPartitions\":").append(numPartitions)
-                .append(",\"acceptPublish\":").append(acceptPublish)
-                .append(",\"acceptSubscribe\":").append(acceptSubscribe)
-                .append(",\"unflushThreshold\":").append(unflushThreshold)
-                .append(",\"unflushInterval\":").append(unflushInterval)
-                .append(",\"deleteWhen\":\"").append(deleteWhen)
-                .append("\",\"deletePolicy\":\"").append(deletePolicy)
-                .append("\",\"numTopicStores\":").append(numTopicStores)
-                .append(",\"unflushDataHold\":").append(unFlushDataHold)
-                .append(",\"memCacheMsgSizeInMB\":").append(memCacheMsgSizeInMB)
-                .append(",\"memCacheMsgCntInK\":").append(memCacheMsgCntInK)
-                .append(",\"memCacheFlushIntvl\":").append(memCacheFlushIntvl)
-                .append("}");
-        strBuffer.append(",\"").append(topicListJsonKey).append("\":[");
-        if (topicConfInfoList == null
-                || topicConfInfoList.isEmpty()) {
-            strBuffer.append("]");
-            return strBuffer;
-        }
-        // topic config metadata in the broker
-        // format topic metadata
-        int count = 0;
-        for (String strTopicConfInfo : topicConfInfoList) {
-            if (TStringUtils.isBlank(strTopicConfInfo)) {
-                continue;
-            }
-            String[] topicConfInfoArr =
-                    strTopicConfInfo.split(TokenConstants.ATTR_SEP);
-            if (count++ > 0) {
-                strBuffer.append(",");
-            }
-            strBuffer.append("{\"topicName\":\"").append(topicConfInfoArr[0]).append("\"");
-            int tmpPartNum = numPartitions;
-            if (!TStringUtils.isBlank(topicConfInfoArr[1])) {
-                tmpPartNum = Integer.parseInt(topicConfInfoArr[1]);
-            }
-            strBuffer.append(",\"numPartitions\":").append(tmpPartNum);
-            boolean tmpAcceptPublish = acceptPublish;
-            if (!TStringUtils.isBlank(topicConfInfoArr[2])) {
-                tmpAcceptPublish = Boolean.parseBoolean(topicConfInfoArr[2]);
-            }
-            strBuffer.append(",\"acceptPublish\":").append(tmpAcceptPublish);
-            boolean tmpAcceptSubscribe = acceptSubscribe;
-            if (!TStringUtils.isBlank(topicConfInfoArr[3])) {
-                tmpAcceptSubscribe = Boolean.parseBoolean(topicConfInfoArr[3]);
-            }
-            strBuffer.append(",\"acceptSubscribe\":").append(tmpAcceptSubscribe);
-            int tmpUnflushThreshold = unflushThreshold;
-            if (!TStringUtils.isBlank(topicConfInfoArr[4])) {
-                tmpUnflushThreshold = Integer.parseInt(topicConfInfoArr[4]);
-            }
-            strBuffer.append(",\"unflushThreshold\":").append(tmpUnflushThreshold);
-            int tmpUnflushInterval = unflushInterval;
-            if (!TStringUtils.isBlank(topicConfInfoArr[5])) {
-                tmpUnflushInterval = Integer.parseInt(topicConfInfoArr[5]);
-            }
-            strBuffer.append(",\"unflushInterval\":").append(tmpUnflushInterval);
-            String tmpDeleteWhen = deleteWhen;
-            if (!TStringUtils.isBlank(topicConfInfoArr[6])) {
-                tmpDeleteWhen = topicConfInfoArr[6];
-            }
-            strBuffer.append(",\"deleteWhen\":\"").append(tmpDeleteWhen).append("\"");
-            String tmpDeletePolicy = deletePolicy;
-            if (!TStringUtils.isBlank(topicConfInfoArr[7])) {
-                tmpDeletePolicy = topicConfInfoArr[7];
-            }
-            int tmpNumTopicStores = numTopicStores;
-            if (!TStringUtils.isBlank(topicConfInfoArr[8])) {
-                tmpNumTopicStores = Integer.parseInt(topicConfInfoArr[8]);
-            }
-            strBuffer.append(",\"numTopicStores\":").append(tmpNumTopicStores);
-            strBuffer.append(",\"deletePolicy\":\"").append(tmpDeletePolicy).append("\"");
-            int topicStatusId = TStatusConstants.STATUS_TOPIC_OK;
-            if (!TStringUtils.isBlank(topicConfInfoArr[9])) {
-                topicStatusId = Integer.parseInt(topicConfInfoArr[9]);
-            }
-            int tmpunFlushDataHold = unFlushDataHold;
-            if (!TStringUtils.isBlank(topicConfInfoArr[10])) {
-                tmpunFlushDataHold = Integer.parseInt(topicConfInfoArr[10]);
-            }
-            strBuffer.append(",\"unflushDataHold\":").append(tmpunFlushDataHold);
-            int tmpmemCacheMsgSizeInMB = memCacheMsgSizeInMB;
-            int tmpmemCacheMsgCntInK = memCacheMsgCntInK;
-            int tmpmemCacheFlushIntvl = memCacheFlushIntvl;
-            int tmpMaxMsgSize = ClusterConfigHolder.getMaxMsgSize();
-            int tmpMinMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
-            if (!TStringUtils.isBlank(topicConfInfoArr[11])) {
-                tmpmemCacheMsgSizeInMB = Integer.parseInt(topicConfInfoArr[11]);
-            }
-            if (!TStringUtils.isBlank(topicConfInfoArr[12])) {
-                tmpmemCacheMsgCntInK = Integer.parseInt(topicConfInfoArr[12]);
-            }
-            if (!TStringUtils.isBlank(topicConfInfoArr[13])) {
-                tmpmemCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
-            }
-            if (topicConfInfoArr.length > 14) {
-                if (!TStringUtils.isNotBlank(topicConfInfoArr[14])) {
-                    tmpMaxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
-                    Tuple2<Integer, Integer> calcResult =
-                            ClusterConfigHolder.calcMaxMsgSize(tmpMaxMsgSize);
-                    tmpMaxMsgSize = calcResult.getF0();
-                    tmpMinMemCacheSize = calcResult.getF1();
-                }
-            }
-            strBuffer.append(",\"memCacheMsgSizeInMB\":").append(tmpmemCacheMsgSizeInMB);
-            strBuffer.append(",\"memCacheMsgCntInK\":").append(tmpmemCacheMsgCntInK);
-            strBuffer.append(",\"memCacheFlushIntvl\":").append(tmpmemCacheFlushIntvl);
-            strBuffer.append(",\"maxMsgSize\":").append(tmpMaxMsgSize);
-            strBuffer.append(",\"minMemCacheSize\":").append(tmpMinMemCacheSize);
-            strBuffer.append(",\"topicStatusId\":").append(topicStatusId);
-            strBuffer.append("}");
-        }
-        strBuffer.append("]");
-        return strBuffer;
-    }
-
-    /* Format to json */
-    public StringBuilder toJsonString(StringBuilder strBuffer, boolean isOrig) {
-        strBuffer.append("\"BrokerSyncStatusInfo\":{\"type\":\"BrokerSyncStatusInfo\",\"brokerId\":").append(brokerId)
-                .append(",\"brokerAddress\":\"").append(brokerIp).append(":").append(brokerPort)
-                .append("\",\"brokerManageStatus\":").append(brokerManageStatus)
-                .append(",\"brokerRunStatus\":").append(brokerRunStatus)
-                .append(",\"subStepOpTimeInMills\":").append(subStepOpTimeInMills)
-                .append(",\"lastDataReportInMills\":").append(lastDataReportInMills)
-                .append(",\"isBrokerRegister\":").append(isBrokerRegister)
-                .append(",\"isBrokerOnline\":").append(isBrokerOnline)
-                .append(",\"isFirstInit\":").append(isFirstInit)
-                .append(",\"isBrokerConfChanged\":").append(isBrokerConfChanged)
-                .append(",\"isBrokerLoaded\":").append(isBrokerLoaded)
-                .append(",\"isFastStart\":").append(isFastStart);
-        if (isOrig) {
-            strBuffer.append(",\"currBrokerConfId\":").append(currBrokerConfId.get())
-                    .append(",\"currBrokerCheckSumId\":").append(currBrokerCheckSumId)
-                    .append(",\"curBrokerDefaultConfInfo\":\"").append(curBrokerDefaultConfInfo)
-                    .append("\",\"curBrokerTopicSetConfInfo\":\"").append(curBrokerTopicSetConfInfo.toString())
-                    .append("\",\"lastPushBrokerConfId\":").append(lastPushBrokerConfId)
-                    .append(",\"lastPushBrokerCheckSumId\":").append(lastPushBrokerCheckSumId)
-                    .append(",\"lastPushBrokerDefaultConfInfo\":\"").append(lastPushBrokerDefaultConfInfo)
-                    .append("\",\"lastPushBrokerTopicSetConfInfo\":\"")
-                    .append(lastPushBrokerTopicSetConfInfo.toString())
-                    .append(",\"reportedBrokerConfId\":").append(reportedBrokerConfId)
-                    .append(",\"reportedBrokerCheckSumId\":").append(reportedBrokerCheckSumId)
-                    .append(",\"reportedBrokerDefaultConfInfo\":\"").append(reportedBrokerDefaultConfInfo)
-                    .append("\",\"reportedBrokerTopicSetConfInfo\":\"")
-                    .append(reportedBrokerTopicSetConfInfo.toString())
-                    .append("}");
-        } else {
-            strBuffer.append(",\"currBrokerConfId\":").append(currBrokerConfId.get())
-                    .append(",\"currBrokerCheckSumId\":").append(currBrokerCheckSumId);
-            strBuffer = getBrokerAndTopicConfJsonInfo(curBrokerDefaultConfInfo,
-                    "curBrokerDefaultConfInfo",
-                    curBrokerTopicSetConfInfo,
-                    "curBrokerTopicSetConfInfo",
-                    strBuffer);
-            strBuffer.append(",\"lastPushBrokerConfId\":").append(lastPushBrokerConfId)
-                    .append(",\"lastPushBrokerCheckSumId\":").append(lastPushBrokerCheckSumId);
-            strBuffer = getBrokerAndTopicConfJsonInfo(lastPushBrokerDefaultConfInfo,
-                    "lastPushBrokerDefaultConfInfo",
-                    lastPushBrokerTopicSetConfInfo,
-                    "lastPushBrokerTopicSetConfInfo",
-                    strBuffer);
-            strBuffer.append(",\"reportedBrokerConfId\":").append(reportedBrokerConfId)
-                    .append(",\"reportedBrokerCheckSumId\":").append(reportedBrokerCheckSumId);
-            strBuffer = getBrokerAndTopicConfJsonInfo(reportedBrokerDefaultConfInfo,
-                    "reportedBrokerDefaultConfInfo",
-                    reportedBrokerTopicSetConfInfo,
-                    "reportedBrokerTopicSetConfInfo",
-                    strBuffer);
-            strBuffer.append("}");
-        }
-        return strBuffer;
-    }
-
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
index ddfca1e..121a82f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
@@ -32,9 +31,13 @@ import org.apache.tubemq.corebase.cluster.Partition;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
 
+
+
 public class BrokerTopicInfoView {
-    private ConcurrentHashMap<String/* topicName */, TopicInfoView>
-            topicConfInfoMap = new ConcurrentHashMap<>();
+    public AtomicLong topicChangeId = new AtomicLong(0);
+    private ConcurrentHashMap<String/* topicName */,
+            ConcurrentHashMap<Integer/* brokerId */, TopicInfo>> topicConfInfoMap =
+            new ConcurrentHashMap<>();
     private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String/* topicName */>>
             brokerIdIndexMap = new ConcurrentHashMap<>();
 
@@ -44,7 +47,7 @@ public class BrokerTopicInfoView {
 
     // remove broker all topic info
     public void rmvBrokerTopicInfo(int brokerId) {
-        TopicInfoView topicInfoView;
+        ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
         // remove pub info
         ConcurrentHashSet<String> topicSet =
                 brokerIdIndexMap.remove(brokerId);
@@ -57,11 +60,12 @@ public class BrokerTopicInfoView {
             }
             topicInfoView = topicConfInfoMap.get(topic);
             if (topicInfoView == null
-                    || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+                    || topicInfoView.isEmpty()) {
                 continue;
             }
-            topicInfoView.rmvBrokerTopicInfo(brokerId);
+            topicInfoView.remove(brokerId);
         }
+        topicChangeId.set(System.currentTimeMillis());
     }
 
     /**
@@ -72,8 +76,7 @@ public class BrokerTopicInfoView {
      *                    if topicInfoMap is null, reserve current configure;
      *                    if topicInfoMap is empty, clear current configure.
      */
-    public void updBrokerTopicConfInfo(int brokerId,
-                                       Map<String, TopicInfo> topicInfoMap) {
+    public void updBrokerTopicConfInfo(int brokerId, Map<String, TopicInfo> topicInfoMap) {
         if (topicInfoMap == null) {
             return;
         }
@@ -90,6 +93,7 @@ public class BrokerTopicInfoView {
         rmvBrokerTopicInfo(brokerId, delTopicSet);
         // add or update TopicInfo
         repBrokerTopicInfo(brokerId, topicInfoMap);
+        topicChangeId.set(System.currentTimeMillis());
     }
 
     /**
@@ -98,9 +102,9 @@ public class BrokerTopicInfoView {
      * @param topicSet need query topic set
      */
     public int getMaxTopicBrokerCnt(Set<String> topicSet) {
-        int maxCount = -1;
         int tmpSize;
-        TopicInfoView topicInfoView;
+        int maxCount = -1;
+        ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
         if (topicSet == null || topicSet.isEmpty()) {
             return maxCount;
         }
@@ -110,10 +114,10 @@ public class BrokerTopicInfoView {
             }
             topicInfoView = topicConfInfoMap.get(topic);
             if (topicInfoView == null
-                    || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+                    || topicInfoView.isEmpty()) {
                 continue;
             }
-            tmpSize = topicInfoView.curMapSize.get();
+            tmpSize = topicInfoView.size();
             if (maxCount < tmpSize) {
                 maxCount = tmpSize;
             }
@@ -122,20 +126,24 @@ public class BrokerTopicInfoView {
     }
 
     /**
-     * Gets the list of topic partitions whose subscribe status is enabled
+     * Gets the map of topic partitions whose subscribe status is enabled
      *
      * @param topicSet need query topic set
      */
-    public List<Partition> getAcceptSubParts(Set<String> topicSet,
+    public Map<String, Partition> getAcceptSubParts(Set<String> topicSet,
                                              Set<Integer> enableSubBrokerIdSet) {
-        List<Partition> partList = new ArrayList<>();
+        Map<String, Partition> partMap = new HashMap<>();
         if (topicSet == null || topicSet.isEmpty()) {
-            return partList;
+            return partMap;
         }
+        List<Partition> tmpPartList;
         for (String topic : topicSet) {
-            partList.addAll(getAcceptSubParts(topic, enableSubBrokerIdSet));
+            tmpPartList = getAcceptSubParts(topic, enableSubBrokerIdSet);
+            for (Partition partition : tmpPartList) {
+                partMap.put(partition.getPartitionKey(), partition);
+            }
         }
-        return partList;
+        return partMap;
     }
 
     /**
@@ -143,20 +151,20 @@ public class BrokerTopicInfoView {
      *
      * @param topic need query topic set
      */
-    public List<Partition> getAcceptSubParts(String topic,
-                                             Set<Integer> enableSubBrokerIdSet) {
+    public List<Partition> getAcceptSubParts(String topic, Set<Integer> enableSubBrokerIdSet) {
+        Partition tmpPart;
         TopicInfo topicInfo;
         List<Partition> partList = new ArrayList<>();
         if (topic == null) {
             return partList;
         }
-        TopicInfoView topicInfoView = topicConfInfoMap.get(topic);
+        ConcurrentHashMap<Integer, TopicInfo> topicInfoView =
+                topicConfInfoMap.get(topic);
         if (topicInfoView == null
-                || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+                || topicInfoView.isEmpty()) {
             return partList;
         }
-        for (Map.Entry<Integer, TopicInfo> entry
-                : topicInfoView.brokerTopicInfoMap.entrySet()) {
+        for (Map.Entry<Integer, TopicInfo> entry : topicInfoView.entrySet()) {
             if (entry.getKey() == null
                     || entry.getValue() == null
                     || !enableSubBrokerIdSet.contains(entry.getKey())) {
@@ -184,21 +192,22 @@ public class BrokerTopicInfoView {
     public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet,
                                                     Set<Integer> enablePubBrokerIdSet) {
         TopicInfo topicInfo;
-        TopicInfoView topicInfoView;
+        ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
         Map<String, String> topicPartStrMap = new HashMap<>();
-        Map<String, StringBuilder> topicPartBufferMap =
-                new HashMap<>();
+        Map<String, StringBuilder> topicPartBufferMap = new HashMap<>();
+        if (topicSet == null || topicSet.isEmpty()) {
+            return topicPartStrMap;
+        }
         for (String topic : topicSet) {
             if (topic == null) {
                 continue;
             }
             topicInfoView = topicConfInfoMap.get(topic);
             if (topicInfoView == null
-                    || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+                    || topicInfoView.isEmpty()) {
                 continue;
             }
-            for (Map.Entry<Integer, TopicInfo> entry
-                    : topicInfoView.brokerTopicInfoMap.entrySet()) {
+            for (Map.Entry<Integer, TopicInfo> entry : topicInfoView.entrySet()) {
                 if (entry.getKey() == null
                         || entry.getValue() == null
                         || !enablePubBrokerIdSet.contains(entry.getKey())) {
@@ -238,11 +247,12 @@ public class BrokerTopicInfoView {
      * @return null or topicInfo configure
      */
     public TopicInfo getBrokerPushedTopicInfo(int brokerId, String topic) {
-        TopicInfoView topicInfoView = topicConfInfoMap.get(topic);
+        ConcurrentHashMap<Integer, TopicInfo> topicInfoView =
+                topicConfInfoMap.get(topic);
         if (topicInfoView == null) {
             return null;
         }
-        return topicInfoView.brokerTopicInfoMap.get(brokerId);
+        return topicInfoView.get(brokerId);
     }
 
     /**
@@ -252,7 +262,7 @@ public class BrokerTopicInfoView {
      */
     public List<TopicInfo> getBrokerPushedTopicInfo(int brokerId) {
         TopicInfo topicInfo;
-        TopicInfoView topicInfoView;
+        ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
         List<TopicInfo> topicInfoList = new ArrayList<>();
         ConcurrentHashSet<String> topicSet = brokerIdIndexMap.get(brokerId);
         if (topicSet == null) {
@@ -264,10 +274,10 @@ public class BrokerTopicInfoView {
             }
             topicInfoView = topicConfInfoMap.get(topic);
             if (topicInfoView == null
-                    || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+                    || topicInfoView.isEmpty()) {
                 continue;
             }
-            topicInfo = topicInfoView.brokerTopicInfoMap.get(brokerId);
+            topicInfo = topicInfoView.get(brokerId);
             if (topicInfo == null) {
                 continue;
             }
@@ -282,9 +292,8 @@ public class BrokerTopicInfoView {
         if (delTopicSet == null || delTopicSet.isEmpty()) {
             return;
         }
-        ConcurrentHashSet<String> topicSet =
-                brokerIdIndexMap.get(brokerId);
-        TopicInfoView topicInfoView;
+        ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
+        ConcurrentHashSet<String> topicSet = brokerIdIndexMap.get(brokerId);
         if (topicSet == null || topicSet.isEmpty()) {
             return;
         }
@@ -292,10 +301,10 @@ public class BrokerTopicInfoView {
             topicSet.remove(topic);
             topicInfoView = topicConfInfoMap.get(topic);
             if ((topicInfoView == null)
-                    || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+                    || topicInfoView.isEmpty()) {
                 continue;
             }
-            topicInfoView.rmvBrokerTopicInfo(brokerId);
+            topicInfoView.remove(brokerId);
         }
     }
 
@@ -306,22 +315,22 @@ public class BrokerTopicInfoView {
             return;
         }
         // add topic info
-        TopicInfoView newTopicInfoView;
-        TopicInfoView curTopicInfoView;
+        ConcurrentHashMap<Integer, TopicInfo> newTopicInfoView;
+        ConcurrentHashMap<Integer, TopicInfo> curTopicInfoView;
         for (TopicInfo topicInfo : topicInfoMap.values()) {
             if (topicInfo == null) {
                 continue;
             }
             curTopicInfoView = topicConfInfoMap.get(topicInfo.getTopic());
             if (curTopicInfoView == null) {
-                newTopicInfoView = new TopicInfoView();
+                newTopicInfoView = new ConcurrentHashMap<Integer, TopicInfo>();
                 curTopicInfoView = topicConfInfoMap.putIfAbsent(
                         topicInfo.getTopic(), newTopicInfoView);
                 if (curTopicInfoView == null) {
                     curTopicInfoView = newTopicInfoView;
                 }
             }
-            curTopicInfoView.addOrUpdBrokerTopicInfo(brokerId, topicInfo);
+            curTopicInfoView.put(brokerId, topicInfo.clone());
         }
         // add broker index
         ConcurrentHashSet<String> curTopicSet = brokerIdIndexMap.get(brokerId);
@@ -335,38 +344,4 @@ public class BrokerTopicInfoView {
         curTopicSet.addAll(topicInfoMap.keySet());
     }
 
-    private static class TopicInfoView {
-        public AtomicInteger curMapSize = new AtomicInteger(0);
-        public AtomicLong topicChangeId = new AtomicLong(0);
-        public ConcurrentHashMap<Integer/* brokerId */, TopicInfo> brokerTopicInfoMap =
-                new ConcurrentHashMap<Integer/* brokerId */, TopicInfo>();
-
-        public TopicInfoView() {
-
-        }
-
-        public boolean rmvBrokerTopicInfo(int brokerId) {
-            TopicInfo topicInfo = brokerTopicInfoMap.remove(brokerId);
-            if (topicInfo == null) {
-                return false;
-            }
-            curMapSize.decrementAndGet();
-            topicChangeId.set(System.currentTimeMillis());
-            return true;
-        }
-
-        public boolean addOrUpdBrokerTopicInfo(int brokerId, TopicInfo topicInfo) {
-            if (topicInfo == null) {
-                return false;
-            }
-            TopicInfo newTopicInfo = topicInfo.clone();
-            TopicInfo preTopicInfo =
-                    brokerTopicInfoMap.put(brokerId, newTopicInfo);
-            if (preTopicInfo == null) {
-                curMapSize.incrementAndGet();
-            }
-            topicChangeId.set(System.currentTimeMillis());
-            return true;
-        }
-    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
new file mode 100644
index 0000000..44f5c36
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.nodemanage.nodebroker;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.TErrCodeConstants;
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.Partition;
+import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2B;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2B;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
+import org.apache.tubemq.corebase.utils.Tuple4;
+import org.apache.tubemq.server.common.heartbeat.HeartbeatManager;
+import org.apache.tubemq.server.common.heartbeat.TimeoutInfo;
+import org.apache.tubemq.server.common.heartbeat.TimeoutListener;
+import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.MasterConfig;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.MetaDataManager;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class DefBrokerRunManager implements BrokerRunManager {
+    private static final Logger logger =
+            LoggerFactory.getLogger(DefBrokerRunManager.class);
+    // meta data manager
+    private final MetaDataManager metaDataManager;
+    private final HeartbeatManager heartbeatManager;
+    // broker string info
+    private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
+    private long lastBrokerUpdatedTime = System.currentTimeMillis();
+    private final ConcurrentHashMap<Integer, String> brokersMap =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Integer, String> brokersTLSMap =
+            new ConcurrentHashMap<>();
+
+    // broker sync FSM
+    private AtomicInteger brokerTotalCount = new AtomicInteger(0);
+    private ConcurrentHashMap<Integer/* brokerId */, BrokerRunStatusInfo> brokerRunSyncManageMap =
+            new ConcurrentHashMap<>();
+    // broker abnormal holder
+    private final BrokerAbnHolder brokerAbnHolder;
+    // broker topic configure for consumer and producer
+    private BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
+
+
+    public DefBrokerRunManager(TMaster tMaster) {
+        this.metaDataManager = tMaster.getDefMetaDataManager();
+        this.heartbeatManager = tMaster.getHeartbeatManager();
+        MasterConfig masterConfig = tMaster.getMasterConfig();
+        this.brokerAbnHolder =
+                new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(),
+                        this.metaDataManager);
+        heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
+                new TimeoutListener() {
+                    @Override
+                    public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) throws Exception {
+                        logger.info(new StringBuilder(512).append("[Broker Timeout] ")
+                                .append(nodeId).toString());
+                        releaseBrokerRunInfo(Integer.parseInt(nodeId), nodeInfo.getSecondKey());
+                    }
+                });
+    }
+
+    @Override
+    public Tuple2<Long, Map<Integer, String>> getBrokerStaticInfo(boolean isOverTLS) {
+        if (isOverTLS) {
+            return new Tuple2<>(brokerInfoCheckSum.get(), brokersTLSMap);
+        } else {
+            return new Tuple2<>(brokerInfoCheckSum.get(), brokersMap);
+        }
+    }
+
+    @Override
+    public void updBrokerStaticInfo(Map<Integer, BrokerConfEntity> brokerConfMap) {
+        if (brokerConfMap == null || brokerConfMap.isEmpty()) {
+            return;
+        }
+        for (BrokerConfEntity entity : brokerConfMap.values()) {
+            updBrokerStaticInfo(entity);
+        }
+    }
+
+    @Override
+    public void updBrokerStaticInfo(BrokerConfEntity entity) {
+        if (entity == null) {
+            return;
+        }
+        String brokerReg =
+                this.brokersMap.putIfAbsent(entity.getBrokerId(),
+                        entity.getSimpleBrokerInfo());
+        String brokerTLSReg =
+                this.brokersTLSMap.putIfAbsent(entity.getBrokerId(),
+                        entity.getSimpleTLSBrokerInfo());
+        if (brokerReg == null
+                || brokerTLSReg == null
+                || !brokerReg.equals(entity.getSimpleBrokerInfo())
+                || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
+            if (brokerReg != null
+                    && !brokerReg.equals(entity.getSimpleBrokerInfo())) {
+                this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
+            }
+            if (brokerTLSReg != null
+                    && !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
+                this.brokersTLSMap.put(entity.getBrokerId(), entity.getSimpleTLSBrokerInfo());
+            }
+            this.brokerInfoCheckSum.set(System.currentTimeMillis());
+        }
+    }
+
+    @Override
+    public void delBrokerStaticInfo(int brokerId) {
+        if (brokerId == TBaseConstants.META_VALUE_UNDEFINED) {
+            return;
+        }
+        String brokerReg = this.brokersMap.remove(brokerId);
+        String brokerTLSReg = this.brokersTLSMap.remove(brokerId);
+        if (brokerReg != null || brokerTLSReg != null) {
+            this.brokerInfoCheckSum.set(System.currentTimeMillis());
+        }
+    }
+
+    @Override
+    public Tuple2<Boolean, Boolean> getBrokerPublishStatus(int brokerId) {
+        return brokerPubSubInfo.getBrokerPubStatus(brokerId);
+    }
+
+
+    @Override
+    public BrokerAbnHolder getBrokerAbnHolder() {
+        return this.brokerAbnHolder;
+    }
+
+    @Override
+    public boolean brokerRegister2M(String clientId, BrokerInfo brokerInfo,
+                                    long reportConfigId, int reportCheckSumId,
+                                    boolean isTackData, String repBrokerConfInfo,
+                                    List<String> repTopicConfInfo, boolean isOnline,
+                                    boolean isOverTLS, StringBuilder sBuffer,
+                                    ProcessResult result) {
+        BrokerConfEntity brokerEntry =
+                metaDataManager.getBrokerConfByBrokerId(brokerInfo.getBrokerId());
+        if (brokerEntry == null) {
+            result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+                    sBuffer.append("Not found broker configure info, please create first!")
+                            .append(" the connecting client id is:")
+                            .append(clientId).toString());
+            sBuffer.delete(0, sBuffer.length());
+            return result.isSuccess();
+        }
+        if ((!brokerInfo.getHost().equals(brokerEntry.getBrokerIp()))
+                || (brokerInfo.getPort() != brokerEntry.getBrokerPort())) {
+            result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+                    sBuffer.append("Inconsistent broker configure,please confirm first!")
+                            .append(" the connecting client id is:").append(clientId)
+                            .append(", the configure's broker address by brokerId is:")
+                            .append(brokerEntry.getBrokerIdAndAddress()).toString());
+            sBuffer.delete(0, sBuffer.length());
+            return result.isSuccess();
+        }
+        int confTLSPort = brokerEntry.getBrokerTLSPort();
+        if (confTLSPort != brokerInfo.getTlsPort()) {
+            result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+                    sBuffer.append("Inconsistent TLS configure, please confirm first!")
+                            .append(" the connecting client id is:").append(clientId)
+                            .append(", the configured TLS port is:").append(confTLSPort)
+                            .append(", the broker reported TLS port is ")
+                            .append(brokerInfo.getTlsPort()).toString());
+            sBuffer.delete(0, sBuffer.length());
+            return result.isSuccess();
+        }
+        if (brokerEntry.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
+            result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+                    sBuffer.append("Broker's configure not online, please online configure first!")
+                            .append(" the connecting client id is:").append(clientId).toString());
+            sBuffer.delete(0, sBuffer.length());
+            return result.isSuccess();
+        }
+        String brokerConfInfo =
+                brokerEntry.getBrokerDefaultConfInfo();
+        Map<String, String> topicConfInfoMap =
+                metaDataManager.getBrokerTopicStrConfigInfo(brokerEntry, sBuffer);
+        //
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunSyncManageMap.get(brokerInfo.getBrokerId());
+        if (runStatusInfo == null) {
+            BrokerRunStatusInfo tmpRunStatusInfo =
+                    new BrokerRunStatusInfo(this, brokerInfo,
+                            brokerEntry.getManageStatus(), brokerConfInfo,
+                            topicConfInfoMap, isOverTLS);
+            runStatusInfo =
+                    brokerRunSyncManageMap.putIfAbsent(
+                            brokerInfo.getBrokerId(), tmpRunStatusInfo);
+            if (runStatusInfo == null) {
+                brokerTotalCount.incrementAndGet();
+                runStatusInfo = tmpRunStatusInfo;
+            }
+        } else {
+            runStatusInfo.reInitRunStatusInfo(brokerInfo,
+                    brokerEntry.getManageStatus(), brokerConfInfo,
+                    topicConfInfoMap, isOverTLS);
+        }
+        runStatusInfo.bookBrokerReportInfo(true, isOnline, reportConfigId,
+                reportCheckSumId, isTackData, repBrokerConfInfo, repTopicConfInfo, sBuffer);
+        heartbeatManager.regBrokerNode(String.valueOf(brokerInfo.getBrokerId()),
+                runStatusInfo.getCreateId());
+        result.setSuccResult(null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean brokerHeartBeat2M(int brokerId, long reportConfigId, int reportCheckSumId,
+                                     boolean isTackData, String repBrokerConfInfo,
+                                     List<String> repTopicConfInfo,
+                                     boolean isTackRmvInfo, List<String> removedTopics,
+                                     int rptReadStatus, int rptWriteStatus, boolean isOnline,
+                                     StringBuilder sBuffer, ProcessResult result) {
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunSyncManageMap.get(brokerId);
+        if (runStatusInfo == null) {
+            result.setFailResult(TErrCodeConstants.HB_NO_NODE, sBuffer
+                    .append("Not found Broker run status info, please register broker first!")
+                    .append(" the connecting client id is:").append(brokerId).toString());
+            return result.isSuccess();
+        }
+        // update heartbeat
+        if (!heartbeatManager.updBrokerNode(String.valueOf(brokerId),
+                runStatusInfo.getCreateId(), sBuffer, result)) {
+            return result.isSuccess();
+        }
+        // update broker status
+        runStatusInfo.bookBrokerReportInfo(false, isOnline, reportConfigId,
+                reportCheckSumId, isTackData, repBrokerConfInfo, repTopicConfInfo, sBuffer);
+        // process removed topic info
+        if (isTackRmvInfo) {
+            metaDataManager.clearRmvedTopicConfInfo(brokerId,
+                    removedTopics, sBuffer, result);
+            logger.info(sBuffer.append("[Broker Report] receive broker removed topics = ")
+                    .append(removedTopics.toString()).append(", removed result is ")
+                    .append(result.getErrInfo()).toString());
+            sBuffer.delete(0, sBuffer.length());
+        }
+        brokerAbnHolder.updateBrokerReportStatus(brokerId, rptReadStatus, rptWriteStatus);
+        result.setSuccResult(null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean brokerClose2M(int brokerId, StringBuilder sBuffer, ProcessResult result) {
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunSyncManageMap.get(brokerId);
+        if (runStatusInfo == null) {
+            result.setFailResult(TErrCodeConstants.HB_NO_NODE, sBuffer
+                    .append("Not found Broker run status info, please register broker first!")
+                    .append(" the connecting client id is:").append(brokerId).toString());
+            return result.isSuccess();
+        }
+        if (!heartbeatManager.unRegBrokerNode(String.valueOf(brokerId),
+                runStatusInfo.getCreateId())) {
+            logger.info(sBuffer.append("[Broker Closed] brokerId=").append(brokerId)
+                    .append(" unregister failure, run-info has been replaced by new request!")
+                    .toString());
+            return result.isSuccess();
+        }
+        boolean isOverTls = runStatusInfo.isOverTLS();
+        releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId());
+        logger.info(sBuffer.append("[Broker Closed]").append(brokerId)
+                .append(" unregister success, isOverTLS=").append(isOverTls).toString());
+        result.setSuccResult(null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public Tuple3<ManageStatus, String, Map<String, String>> getBrokerMetaConfigInfo(int brokerId) {
+        String brokerConfInfo = null;
+        ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
+        StringBuilder sBuffer = new StringBuilder(512);
+        BrokerConfEntity brokerConfEntity =
+                metaDataManager.getBrokerConfByBrokerId(brokerId);
+        if (brokerConfEntity != null) {
+            brokerConfInfo = brokerConfEntity.getBrokerDefaultConfInfo();
+            manageStatus = brokerConfEntity.getManageStatus();
+        }
+        Map<String, String> brokerTopicSetConfInfo =
+                this.metaDataManager.getBrokerTopicStrConfigInfo(brokerConfEntity, sBuffer);
+        return new Tuple3<>(manageStatus, brokerConfInfo, brokerTopicSetConfInfo);
+    }
+
+    @Override
+    public void setRegisterDownConfInfo(int brokerId, StringBuilder sBuffer,
+                                        RegisterResponseM2B.Builder builder) {
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunSyncManageMap.get(brokerId);
+        if (runStatusInfo == null) {
+            logger.info(sBuffer.append("Get Broker run-info failure, brokerId=")
+                    .append(brokerId).append(", please check the implement first!")
+                    .toString());
+            sBuffer.delete(0, sBuffer.length());
+            return;
+        }
+        Tuple4<Long, Integer, String, List<String>> retTuple =
+                runStatusInfo.getNeedSyncData();
+        builder.setCurBrokerConfId(retTuple.getF0());
+        builder.setConfCheckSumId(retTuple.getF1());
+        Tuple2<Boolean, Boolean> autoFbdTuple =
+                brokerAbnHolder.getBrokerAutoFbdStatus(brokerId);
+        builder.setStopWrite(autoFbdTuple.getF0());
+        builder.setStopRead(autoFbdTuple.getF1());
+        if (retTuple.getF2() == null) {
+            builder.setTakeConfInfo(false);
+        } else {
+            builder.setTakeConfInfo(true);
+            builder.setBrokerDefaultConfInfo(retTuple.getF2());
+            builder.addAllBrokerTopicSetConfInfo(retTuple.getF3());
+            logger.info(sBuffer.append("[TMaster sync] push broker configure: brokerId = ")
+                    .append(brokerId).append(",configureId=").append(retTuple.getF0())
+                    .append(",stopWrite=").append(builder.getStopWrite())
+                    .append(",stopRead=").append(builder.getStopRead())
+                    .append(",checksumId=").append(retTuple.getF1())
+                    .append(",default configure is ").append(retTuple.getF2())
+                    .append(",topic configure is ").append(retTuple.getF3()).toString());
+            sBuffer.delete(0, sBuffer.length());
+        }
+    }
+
+    @Override
+    public void setHeatBeatDownConfInfo(int brokerId, StringBuilder sBuffer,
+                                        HeartResponseM2B.Builder builder) {
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunSyncManageMap.get(brokerId);
+        if (runStatusInfo == null) {
+            logger.info(sBuffer.append("Get Broker run-info failure, brokerId=")
+                    .append(brokerId).append(", please check the implement first!")
+                    .toString());
+            sBuffer.delete(0, sBuffer.length());
+            return;
+        }
+        Tuple4<Long, Integer, String, List<String>> retTuple =
+                runStatusInfo.getNeedSyncData();
+        builder.setCurBrokerConfId(retTuple.getF0());
+        builder.setConfCheckSumId(retTuple.getF1());
+        Tuple2<Boolean, Boolean> autoFbdTuple =
+                brokerAbnHolder.getBrokerAutoFbdStatus(brokerId);
+        builder.setStopWrite(autoFbdTuple.getF0());
+        builder.setStopRead(autoFbdTuple.getF1());
+        if (retTuple.getF2() == null) {
+            builder.setTakeConfInfo(false);
+        } else {
+            builder.setTakeConfInfo(true);
+            builder.setBrokerDefaultConfInfo(retTuple.getF2());
+            builder.addAllBrokerTopicSetConfInfo(retTuple.getF3());
+            logger.info(sBuffer.append("[TMaster sync] heartbeat sync config: brokerId = ")
+                    .append(brokerId).append(",configureId=").append(retTuple.getF0())
+                    .append(",stopWrite=").append(builder.getStopWrite())
+                    .append(",stopRead=").append(builder.getStopRead())
+                    .append(",checksumId=").append(retTuple.getF1())
+                    .append(",default configure is ").append(retTuple.getF2())
+                    .append(",topic configure is ").append(retTuple.getF3()).toString());
+            sBuffer.delete(0, sBuffer.length());
+        }
+    }
+
+    @Override
+    public BrokerRunStatusInfo getBrokerRunStatusInfo(int brokerId) {
+        return this.brokerRunSyncManageMap.get(brokerId);
+    }
+
+    @Override
+    public BrokerInfo getBrokerInfo(int brokerId) {
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunSyncManageMap.get(brokerId);
+        if (runStatusInfo == null) {
+            return null;
+        }
+        return runStatusInfo.getBrokerInfo();
+    }
+
+    @Override
+    public Map<Integer, BrokerInfo> getBrokerInfoMap(List<Integer> brokerIds) {
+        Map<Integer, BrokerInfo> brokerInfoMap = new HashMap<>();
+        if (brokerIds == null || brokerIds.isEmpty()) {
+            for (BrokerRunStatusInfo runStatusInfo : brokerRunSyncManageMap.values()) {
+                if (runStatusInfo == null) {
+                    continue;
+                }
+                BrokerInfo brokerInfo = runStatusInfo.getBrokerInfo();
+                brokerInfoMap.put(brokerInfo.getBrokerId(), brokerInfo);
+            }
+        } else {
+            for (Integer brokerId : brokerIds) {
+                BrokerRunStatusInfo runStatusInfo =
+                        brokerRunSyncManageMap.get(brokerId);
+                if (runStatusInfo == null) {
+                    continue;
+                }
+                brokerInfoMap.put(brokerId, runStatusInfo.getBrokerInfo());
+            }
+        }
+        return brokerInfoMap;
+    }
+
+    @Override
+    public boolean releaseBrokerRunInfo(int brokerId, String blockId) {
+        StringBuilder sBuffer = new StringBuilder(512);
+        BrokerRunStatusInfo runStatusInfo =
+                brokerRunSyncManageMap.get(brokerId);
+        if (runStatusInfo == null) {
+            logger.info(sBuffer.append("[Broker Release] brokerId=").append(brokerId)
+                    .append(" release failure, run-info has deleted before!").toString());
+            return false;
+        }
+        if (!blockId.equals(runStatusInfo.getCreateId())) {
+            logger.info(sBuffer.append("[Broker Release] brokerId=").append(brokerId)
+                    .append(" release failure, run-info has been replaced by new register!")
+                    .toString());
+            return false;
+        }
+        runStatusInfo = brokerRunSyncManageMap.remove(brokerId);
+        if (runStatusInfo == null) {
+            return false;
+        }
+        brokerTotalCount.decrementAndGet();
+        brokerAbnHolder.removeBroker(brokerId);
+        brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
+        logger.info(sBuffer.append("[Broker Release] brokerId=")
+                .append(brokerId).append("  release success!").toString());
+        return true;
+    }
+
+    @Override
+    public void updBrokerCsmConfInfo(int brokerId, ManageStatus mngStatus,
+                                     Map<String, TopicInfo> topicInfoMap) {
+        brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
+        brokerPubSubInfo.updBrokerSubTopicConfInfo(brokerId, topicInfoMap);
+
+    }
+
+    @Override
+    public void updBrokerPrdConfInfo(int brokerId, ManageStatus mngStatus,
+                                     Map<String, TopicInfo> topicInfoMap) {
+        brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
+        brokerPubSubInfo.updBrokerPubTopicConfInfo(brokerId, topicInfoMap);
+    }
+
+    @Override
+    public Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet) {
+        return brokerPubSubInfo.getAcceptPubPartInfo(topicSet);
+    }
+
+    @Override
+    public int getSubTopicMaxBrokerCount(Set<String> topicSet) {
+        return brokerPubSubInfo.getTopicMaxSubBrokerCnt(topicSet);
+    }
+
+    @Override
+    public Map<String, Partition> getSubBrokerAcceptSubParts(Set<String> topicSet) {
+        return brokerPubSubInfo.getAcceptSubParts(topicSet);
+    }
+
+    @Override
+    public List<Partition> getSubBrokerAcceptSubParts(String topic) {
+        return brokerPubSubInfo.getAcceptSubParts(topic);
+    }
+
+    @Override
+    public TopicInfo getPubBrokerTopicInfo(int brokerId, String topic) {
+        return brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic);
+    }
+
+    @Override
+    public List<TopicInfo> getPubBrokerPushedTopicInfo(int brokerId) {
+        return brokerPubSubInfo.getPubBrokerPushedTopicInfo(brokerId);
+    }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 19371a1..95c33d9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -17,17 +17,10 @@
 
 package org.apache.tubemq.server.master.nodemanage.nodebroker;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.corebase.cluster.Partition;
-import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
@@ -39,9 +32,6 @@ public class TopicPSInfoManager {
 
     private final TMaster master;
     private final ConcurrentHashMap<String/* topic */,
-            ConcurrentHashMap<BrokerInfo, TopicInfo>> brokerPubInfoMap =
-            new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String/* topic */,
             ConcurrentHashSet<String/* producerId */>> topicPubInfoMap =
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* topic */,
@@ -146,147 +136,7 @@ public class TopicPSInfoManager {
         }
     }
 
-    public ConcurrentHashMap<BrokerInfo, TopicInfo> getBrokerPubInfo(String topic) {
-        return brokerPubInfoMap.get(topic);
-    }
-
-    public void setBrokerPubInfo(String topic,
-                                 ConcurrentHashMap<BrokerInfo, TopicInfo> brokerPubInfo) {
-        brokerPubInfoMap.put(topic, brokerPubInfo);
-    }
-
-    public int getTopicMaxBrokerCount(Set<String> topicSet) {
-        int maxCount = -1;
-        if (topicSet == null) {
-            return maxCount;
-        }
-        for (String topicTtem : topicSet) {
-            if (topicTtem == null) {
-                continue;
-            }
-            ConcurrentHashMap<BrokerInfo, TopicInfo> tmpBrokerMap =
-                    brokerPubInfoMap.get(topicTtem);
-            if (tmpBrokerMap == null) {
-                continue;
-            }
-            int tmpSize =
-                    tmpBrokerMap.keySet().size();
-            if (maxCount < tmpSize) {
-                maxCount = tmpSize;
-            }
-        }
-        return maxCount;
-    }
-
-    public Set<Partition> getPartitions() {
-        Set<Partition> partitions = new HashSet<>();
-        for (Map<BrokerInfo, TopicInfo> broker
-                : this.brokerPubInfoMap.values()) {
-            for (Map.Entry<BrokerInfo, TopicInfo> entry
-                    : broker.entrySet()) {
-                TopicInfo topicInfo = entry.getValue();
-                for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
-                    int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
-                    for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
-                        partitions.add(new Partition(entry.getKey(),
-                                topicInfo.getTopic(), baseValue + i));
-                    }
-                }
-            }
-        }
-        return partitions;
-    }
-
-    public Set<Partition> getPartitions(Set<String> topics) {
-        Set<Partition> partList = new HashSet<>();
-        for (String topic : topics) {
-            partList.addAll(getPartitionSet(topic));
-        }
-        return partList;
-    }
-
-    public Map<String, Partition> getPartitionMap(Set<String> topics) {
-        Map<String, Partition> partMap = new HashMap<>();
-        for (String topic : topics) {
-            ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
-                    brokerPubInfoMap.get(topic);
-            if (topicInfoMap == null) {
-                continue;
-            }
-            for (Map.Entry<BrokerInfo, TopicInfo> entry : topicInfoMap.entrySet()) {
-                TopicInfo topicInfo = entry.getValue();
-                if (!topicInfo.isAcceptSubscribe()) {
-                    continue;
-                }
-                for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
-                    int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
-                    for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
-                        Partition partition =
-                                new Partition(entry.getKey(), topicInfo.getTopic(), baseValue + i);
-                        partMap.put(partition.getPartitionKey(), partition);
-                    }
-                }
-            }
-        }
-        return partMap;
-    }
-
-    public Set<Partition> getPartitionSet(String topic) {
-        Set<Partition> partSet = new HashSet<>();
-        ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
-                brokerPubInfoMap.get(topic);
-        if (topicInfoMap == null) {
-            return new HashSet<>();
-        }
-        for (Map.Entry<BrokerInfo, TopicInfo> entry
-                : topicInfoMap.entrySet()) {
-            TopicInfo topicInfo = entry.getValue();
-            if (!topicInfo.isAcceptSubscribe()) {
-                continue;
-            }
-            for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
-                int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
-                for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
-                    partSet.add(new Partition(entry.getKey(),
-                            topicInfo.getTopic(), baseValue + i));
-                }
-            }
-
-        }
-        return partSet;
-    }
-
-    public List<Partition> getPartitionList(String topic) {
-        List<Partition> partList = new ArrayList<>(getPartitionSet(topic));
-        return partList;
-    }
-
-    public TopicInfo getTopicInfo(String topic, BrokerInfo broker) {
-        ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
-                brokerPubInfoMap.get(topic);
-        if (topicInfoMap != null) {
-            return topicInfoMap.get(broker);
-        }
-        return null;
-    }
-
-    public List<TopicInfo> getBrokerPubInfoList(BrokerInfo broker) {
-        List<TopicInfo> topicInfoList = new ArrayList<>();
-        for (Map.Entry<String, ConcurrentHashMap<BrokerInfo, TopicInfo>> pubEntry
-                : brokerPubInfoMap.entrySet()) {
-            ConcurrentHashMap<BrokerInfo, TopicInfo> topicPubMap =
-                    pubEntry.getValue();
-            for (Map.Entry<BrokerInfo, TopicInfo> entry : topicPubMap.entrySet()) {
-                if (entry.getKey().equals(broker)) {
-                    topicInfoList.add(entry.getValue());
-                }
-            }
-        }
-        return topicInfoList;
-    }
-
     public void clear() {
-        brokerPubInfoMap.clear();
         topicPubInfoMap.clear();
         topicSubInfoMap.clear();
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
index d9d9279..e3f0612 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
@@ -36,6 +36,7 @@ import org.apache.tubemq.corerpc.exception.StandbyException;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
 import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 import org.apache.tubemq.server.master.web.simplemvc.Action;
@@ -216,16 +217,17 @@ public class Master implements Action {
     private void innGetBrokerInfo(final HttpServletRequest req,
                                            StringBuilder sBuilder, boolean isOldRet) {
         Map<Integer, BrokerInfo> brokerInfoMap = null;
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         String brokerIds = req.getParameter("ids");
         if (TStringUtils.isBlank(brokerIds)) {
-            brokerInfoMap = master.getBrokerHolder().getBrokerInfoMap();
+            brokerInfoMap = brokerRunManager.getBrokerInfoMap(null);
         } else {
             String[] brokerIdArr = brokerIds.split(",");
             List<Integer> idList = new ArrayList<>(brokerIdArr.length);
             for (String strId : brokerIdArr) {
                 idList.add(Integer.parseInt(strId));
             }
-            brokerInfoMap = master.getBrokerHolder().getBrokerInfos(idList);
+            brokerInfoMap = brokerRunManager.getBrokerInfoMap(idList);
         }
         if (brokerInfoMap != null) {
             int index = 1;
@@ -234,8 +236,8 @@ public class Master implements Action {
                 sBuilder.append("\n################################## ")
                         .append(index).append(". ").append(broker.toString())
                         .append(" ##################################\n");
-                TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
-                List<TopicInfo> topicInfoList = topicPSInfoManager.getBrokerPubInfoList(broker);
+                List<TopicInfo> topicInfoList =
+                        brokerRunManager.getPubBrokerPushedTopicInfo(broker.getBrokerId());
                 Map<String, TopicDeployEntity> topicConfigMap =
                         metaDataManager.getBrokerTopicConfEntitySet(broker.getBrokerId());
                 if (topicConfigMap == null) {
@@ -296,7 +298,7 @@ public class Master implements Action {
      */
     private void getUnbalanceGroupInfo(StringBuilder sBuilder) {
         ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
-        TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
                 master.getCurrentSubInfoMap();
         int currPartSize = 0;
@@ -319,7 +321,8 @@ public class Master implements Action {
                         }
                     }
                 }
-                List<Partition> partList = topicPSInfoManager.getPartitionList(topic);
+                List<Partition> partList =
+                        brokerRunManager.getSubBrokerAcceptSubParts(topic);
                 if (currPartSize != partList.size()) {
                     sBuilder.append(group).append(":").append(topic).append("\n");
                 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
index 909a888..771aad1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
@@ -28,8 +28,7 @@ import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.TubeServerVersion;
 import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.tubemq.server.master.web.common.BrokerQueryResult;
 import org.apache.tubemq.server.master.web.model.BrokerVO;
 import org.apache.tubemq.server.master.web.simplemvc.Action;
@@ -57,8 +56,9 @@ public class BrokerList implements Action {
         int pageSize = TStringUtils.isNotEmpty(strPageSize)
                 ? Integer.parseInt(strPageSize) : 10;
         pageSize = pageSize <= 10 ? 10 : pageSize;
-        BrokerInfoHolder infoHolder = master.getBrokerHolder();
-        List<BrokerInfo> brokerInfoList = new ArrayList(infoHolder.getBrokerInfoMap().values());
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
+        List<BrokerInfo> brokerInfoList =
+                new ArrayList(brokerRunManager.getBrokerInfoMap(null).values());
         // *************************************************************************************
         for (int i = 0; i < 95; i++) {
             BrokerInfo info = new BrokerInfo(i, "192.168.0.1", 8123);
@@ -85,12 +85,12 @@ public class BrokerList implements Action {
                             + pageSize;
             List<BrokerInfo> firstPageList = brokerInfoList.subList(fromIndex, toIndex);
             brokerVOList = new ArrayList<>(brokerInfoList.size());
-            TopicPSInfoManager psInfoManager = master.getTopicPSInfoManager();
             for (BrokerInfo brokerInfo : firstPageList) {
                 BrokerVO brokerVO = new BrokerVO();
                 brokerVO.setId(brokerInfo.getBrokerId());
                 brokerVO.setIp(brokerInfo.getHost());
-                List<TopicInfo> topicInfoList = psInfoManager.getBrokerPubInfoList(brokerInfo);
+                List<TopicInfo> topicInfoList =
+                        brokerRunManager.getPubBrokerPushedTopicInfo(brokerInfo.getBrokerId());
                 brokerVO.setTopicCount(topicInfoList.size());
                 int totalPartitionNum = 0;
                 for (TopicInfo topicInfo : topicInfoList) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index c7fd104..1971caa 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -29,9 +29,9 @@ import org.apache.tubemq.corebase.cluster.BrokerInfo;
 import org.apache.tubemq.corebase.utils.AddressUtils;
 import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
 import org.apache.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.statusdef.StepStatus;
 import org.apache.tubemq.server.common.statusdef.TopicStatus;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.WebParameterUtils;
@@ -42,8 +42,9 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerCon
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunStatusInfo;
 
 
 /**
@@ -468,8 +469,8 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
             return sBuffer;
         }
         String relReason = (String) result.getRetData();
-        BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
-        brokerInfoHolder.relAutoForbiddenBrokerInfo(brokerIds, relReason);
+        BrokerAbnHolder abnHolder = master.getBrokerAbnHolder();
+        abnHolder.relAutoForbiddenBrokerInfo(brokerIds, relReason);
         WebParameterUtils.buildSuccessResult(sBuffer);
         return sBuffer;
     }
@@ -534,25 +535,26 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
         // query current broker configures
         Map<Integer, BrokerConfEntity> brokerConfEntityMap =
                 metaDataManager.getBrokerConfInfo(brokerIds, brokerIpSet, null);
-        BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
-        Map<Integer, BrokerInfoHolder.BrokerAbnInfo> brokerAbnInfoMap =
-                brokerInfoHolder.getBrokerAbnormalMap();
-        Map<Integer, BrokerInfoHolder.BrokerFbdInfo> brokerFbdInfoMap =
-                brokerInfoHolder.getAutoForbiddenBrokerMapInfo();
+        BrokerAbnHolder abnHolder = master.getBrokerAbnHolder();
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
+        Map<Integer, BrokerAbnHolder.BrokerAbnInfo> brokerAbnInfoMap =
+                abnHolder.getBrokerAbnormalMap();
+        Map<Integer, BrokerAbnHolder.BrokerFbdInfo> brokerFbdInfoMap =
+                abnHolder.getAutoForbiddenBrokerMapInfo();
         int totalCnt = 0;
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (BrokerConfEntity entity : brokerConfEntityMap.values()) {
-            BrokerInfoHolder.BrokerAbnInfo brokerAbnInfo =
+            BrokerAbnHolder.BrokerAbnInfo brokerAbnInfo =
                     brokerAbnInfoMap.get(entity.getBrokerId());
             if (onlyAbnormal && brokerAbnInfo == null) {
                 continue;
             }
-            BrokerInfoHolder.BrokerFbdInfo brokerForbInfo =
+            BrokerAbnHolder.BrokerFbdInfo brokerForbInfo =
                     brokerFbdInfoMap.get(entity.getBrokerId());
             if (onlyAutoForbidden && brokerForbInfo == null) {
                 continue;
             }
-            BrokerInfo brokerInfo = brokerInfoHolder.getBrokerInfo(entity.getBrokerId());
+            BrokerInfo brokerInfo = brokerRunManager.getBrokerInfo(entity.getBrokerId());
             if (onlyEnableTLS && (brokerInfo == null || !brokerInfo.isEnableTLS())) {
                 continue;
             }
@@ -588,39 +590,41 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
             } else {
                 Tuple2<Boolean, Boolean> pubSubTuple =
                         entity.getManageStatus().getPubSubStatus();
-                BrokerSyncStatusInfo brokerSyncStatusInfo =
-                        metaDataManager.getBrokerRunSyncStatusInfo(entity.getBrokerId());
-                if (brokerSyncStatusInfo == null) {
+                BrokerRunStatusInfo runStatusInfo =
+                        brokerRunManager.getBrokerRunStatusInfo(entity.getBrokerId());
+                if (runStatusInfo == null) {
                     sBuffer.append(",\"runStatus\":\"unRegister\",\"subStatus\":\"-\"")
                             .append(",\"isConfChanged\":\"-\",\"isConfLoaded\":\"-\",\"isBrokerOnline\":\"-\"")
                             .append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"-\",\"acceptSubscribe\":\"-\"");
                 } else {
-                    int stepStatus = brokerSyncStatusInfo.getBrokerRunStatus();
-                    if (brokerSyncStatusInfo.isBrokerOnline()) {
-                        if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
+                    StepStatus stepStatus = runStatusInfo.getCurStepStatus();
+                    if (runStatusInfo.isOnline()) {
+                        if (stepStatus == StepStatus.STEP_STATUS_UNDEFINED) {
                             sBuffer.append(",\"runStatus\":\"running\",\"subStatus\":\"idle\"");
                         } else {
                             sBuffer.append(",\"runStatus\":\"running\"")
                                     .append(",\"subStatus\":\"processing_event\",\"stepOp\":")
-                                    .append(stepStatus);
+                                    .append(stepStatus.getCode());
                         }
                     } else {
-                        if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
+                        if (stepStatus == StepStatus.STEP_STATUS_UNDEFINED) {
                             sBuffer.append(",\"runStatus\":\"notRegister\",\"subStatus\":\"idle\"");
                         } else {
                             sBuffer.append(",\"runStatus\":\"notRegister\"")
                                     .append(",\"subStatus\":\"processing_event\",\"stepOp\":")
-                                    .append(stepStatus);
+                                    .append(stepStatus.getCode());
                         }
                     }
-                    sBuffer.append(",\"isConfChanged\":\"").append(brokerSyncStatusInfo.isBrokerConfChanged())
-                            .append("\",\"isConfLoaded\":\"").append(brokerSyncStatusInfo.isBrokerLoaded())
-                            .append("\",\"isBrokerOnline\":\"").append(brokerSyncStatusInfo.isBrokerOnline())
+                    Tuple2<Boolean, Boolean> syncTuple =
+                            runStatusInfo.getDataSyncStatus();
+                    sBuffer.append(",\"isConfChanged\":\"").append(syncTuple.getF0())
+                            .append("\",\"isConfLoaded\":\"").append(syncTuple.getF1())
+                            .append("\",\"isBrokerOnline\":\"").append(runStatusInfo.isOnline())
                             .append("\"").append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"")
                             .append(pubSubTuple.getF0()).append("\",\"acceptSubscribe\":\"")
                             .append(pubSubTuple.getF1()).append("\"");
                     if (withDetail) {
-                        sBuffer = brokerSyncStatusInfo.toJsonString(sBuffer.append(","), false);
+                        sBuffer = runStatusInfo.toJsonString(sBuffer.append(","));
                     }
                 }
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index c8f28ba..c045c4d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -23,7 +23,6 @@ import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.TServerConstants;
@@ -37,7 +36,7 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSe
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
 import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
 
@@ -248,7 +247,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
         // query topic configure info
         Map<String, List<TopicDeployEntity>> topicConfMap =
                 metaDataManager.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIds);
-        TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         int totalCount = 0;
         int brokerCount = 0;
         int totalCfgNumPartCount = 0;
@@ -285,11 +284,8 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
                     isAcceptPublish = pubSubStatus.getF0();
                     isAcceptSubscribe = pubSubStatus.getF1();
                 }
-                BrokerInfo broker =
-                        new BrokerInfo(entity.getBrokerId(),
-                                entity.getBrokerIp(), entity.getBrokerPort());
-                TopicInfo topicInfo = topicPSInfoManager.getTopicInfo(
-                        entity.getTopicName(), broker);
+                TopicInfo topicInfo =
+                        brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
                 if (topicInfo != null) {
                     if (isAcceptPublish && topicInfo.isAcceptPublish()) {
                         isSrvAcceptPublish = true;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index aba06c8..5dc45e9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -25,7 +25,6 @@ import java.util.Set;
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.fielddef.WebFieldDef;
@@ -43,8 +42,8 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupCons
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunStatusInfo;
 
 
 
@@ -250,6 +249,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
         int dataCount = 0;
         int totalStoreNum = 0;
         int totalNumPartCount = 0;
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (Map.Entry<Integer, List<TopicDeployEntity>> entry : queryResult.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
@@ -260,8 +260,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
             if (brokerConf == null) {
                 continue;
             }
-            BrokerSyncStatusInfo brokerSyncInfo =
-                    metaDataManager.getBrokerRunSyncStatusInfo(entry.getKey());
+            BrokerRunStatusInfo runStatusInfo =
+                    brokerRunManager.getBrokerRunStatusInfo(entry.getKey());
             List<TopicDeployEntity> topicDeployInfo = entry.getValue();
             // build return info item
             if (dataCount++ > 0) {
@@ -277,22 +277,24 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                     brokerConf.getManageStatus().getDescription();
             Tuple2<Boolean, Boolean> pubSubStatus =
                     brokerConf.getManageStatus().getPubSubStatus();
-            if (brokerSyncInfo == null) {
+            if (runStatusInfo == null) {
                 sBuffer.append("\"acceptPublish\":\"-\"")
                         .append(",\"acceptSubscribe\":\"-\"")
                         .append(",\"totalPartitionNum\":\"-\"")
                         .append(",\"totalTopicStoreNum\":\"-\"")
                         .append(",\"brokerManageStatus\":\"-\"");
             } else {
+                Tuple2<Boolean, Boolean> publishTuple =
+                        brokerRunManager.getBrokerPublishStatus(entry.getKey());
                 if (pubSubStatus.getF0()) {
                     sBuffer.append("\"acceptPublish\":")
-                            .append(brokerSyncInfo.isAcceptPublish());
+                            .append(publishTuple.getF0());
                 } else {
                     sBuffer.append("\"acceptPublish\":false");
                 }
                 if (pubSubStatus.getF1()) {
                     sBuffer.append(",\"acceptSubscribe\":")
-                            .append(brokerSyncInfo.isAcceptSubscribe());
+                            .append(publishTuple.getF1());
                 } else {
                     sBuffer.append(",\"acceptSubscribe\":false");
                 }
@@ -494,7 +496,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
         boolean isAcceptSubscribe = false;
         ManageStatus manageStatus;
         Tuple2<Boolean, Boolean> pubSubStatus;
-        TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         ClusterSettingEntity defSetting = metaDataManager.getClusterDefSetting(false);
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
@@ -533,10 +535,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                     isAcceptPublish = pubSubStatus.getF0();
                     isAcceptSubscribe = pubSubStatus.getF1();
                 }
-                BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
-                        entity.getBrokerIp(), entity.getBrokerPort());
                 TopicInfo topicInfo =
-                        topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+                        brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
                 if (topicInfo == null) {
                     sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
                             .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
@@ -634,7 +634,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
         boolean isAcceptSubscribe = false;
         ManageStatus manageStatus;
         Tuple2<Boolean, Boolean> pubSubStatus;
-        TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+        BrokerRunManager brokerRunManager = master.getBrokerRunManager();
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployMap.entrySet()) {
             totalCfgNumPartCount = 0;
@@ -665,10 +665,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                     isAcceptPublish = pubSubStatus.getF0();
                     isAcceptSubscribe = pubSubStatus.getF1();
                 }
-                BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
-                        entity.getBrokerIp(), entity.getBrokerPort());
                 TopicInfo topicInfo =
-                        topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+                        brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
                 if (topicInfo == null) {
                     sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
                             .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java
index 14f91e6..e76b783 100644
--- a/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java
+++ b/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java
@@ -51,7 +51,7 @@ public class HeartbeatManagerTest {
                                 .append(nodeId).toString());
                     }
                 });
-        heartbeatManager.regBrokerNode("node1");
+        heartbeatManager.regBrokerNode("node1", String.valueOf(System.currentTimeMillis()));
         Assert.assertTrue(heartbeatManager.getBrokerRegMap().get("node1").getTimeoutTime() >
                 System.currentTimeMillis());
     }
diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolderTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolderTest.java
deleted file mode 100644
index 7f2c051..0000000
--- a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolderTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.nodemanage.nodebroker;
-
-import static org.mockito.Mockito.mock;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.server.master.metamanage.MetaDataManager;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BrokerInfoHolderTest {
-    private BrokerInfoHolder brokerInfoHolder;
-    private MetaDataManager metaDataManager;
-
-    @Before
-    public void setUp() throws Exception {
-        metaDataManager = mock(MetaDataManager.class);
-        brokerInfoHolder = new BrokerInfoHolder(10, metaDataManager);
-    }
-
-    @Test
-    public void testBrokerInfo() {
-        BrokerInfo brokerInfo = mock(BrokerInfo.class);
-
-        brokerInfoHolder.setBrokerInfo(1, brokerInfo);
-        Assert.assertEquals(brokerInfo, brokerInfoHolder.getBrokerInfo(1));
-
-        brokerInfoHolder.removeBroker(1);
-        Assert.assertNull(brokerInfoHolder.getBrokerInfo(1));
-    }
-}