You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/04/16 10:00:25 UTC

[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-596] Add WebTopicConfHandler class implementation

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
     new 3030686  [INLONG-596] Add WebTopicConfHandler class implementation
3030686 is described below

commit 30306863c72e9726f9eadb65c819722b22003550
Author: gosonzhang <go...@tencent.com>
AuthorDate: Thu Apr 15 14:54:53 2021 +0800

    [INLONG-596] Add WebTopicConfHandler class implementation
---
 .../tubemq/server/common/fielddef/WebFieldDef.java |   7 +-
 .../server/common/statusdef/ManageStatus.java      |  29 +-
 .../server/master/metamanage/MetaDataManager.java  | 512 +++++++++---
 .../metastore/BdbMetaStoreServiceImpl.java         | 143 +++-
 .../metamanage/metastore/MetaStoreService.java     |  36 +-
 .../metastore/dao/entity/ClusterSettingEntity.java |  27 +-
 .../metastore/dao/entity/TopicCtrlEntity.java      |  69 +-
 ...eployConfEntity.java => TopicDeployEntity.java} | 107 ++-
 ...loyConfigMapper.java => TopicDeployMapper.java} |  26 +-
 ...pperImpl.java => BdbTopicDeployMapperImpl.java} | 172 ++--
 .../master/web/handler/TopicProcessResult.java     |  48 ++
 .../web/handler/WebAdminFlowRuleHandler.java       | 520 +++++++++++++
 .../master/web/handler/WebBrokerConfHandler.java   |  18 +-
 .../master/web/handler/WebMasterInfoHandler.java   |   8 +-
 .../master/web/handler/WebTopicConfHandler.java    | 862 +++++++++++++++++++++
 15 files changed, 2298 insertions(+), 286 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 1ecca31..dafd8b1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -217,10 +217,9 @@ public enum WebFieldDef {
             "Broker ip", TBaseConstants.META_MAX_BROKER_IP_LENGTH,
              RegexDef.TMP_IPV4ADDRESS),
     ISRESERVEDDATA(77, "isReservedData", "isRsvDt",
-            WebFieldType.BOOLEAN, "Whether to keep topic data in the broker");
-
-
-
+            WebFieldType.BOOLEAN, "Whether to keep topic data in the broker"),
+    WITHCTRLINFO(78, "ctrlData", "cD",
+              WebFieldType.BOOLEAN, "With topic control data info.");
 
 
     public final int id;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
index bfb9975..e1f1962 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
@@ -17,22 +17,31 @@
 
 package org.apache.tubemq.server.common.statusdef;
 
+import org.apache.tubemq.corebase.utils.Tuple2;
+
+
 public enum ManageStatus {
 
-    STATUS_MANAGE_UNDEFINED(-2, "Undefined."),
-    STATUS_MANAGE_APPLY(1, "Apply."),
-    STATUS_MANAGE_ONLINE(5, "Online."),
-    STATUS_MANAGE_ONLINE_NOT_WRITE(6, "Online with not write"),
-    STATUS_MANAGE_ONLINE_NOT_READ(7, "Online with not read"),
-    STATUS_MANAGE_OFFLINE(9, "Offline");
+    STATUS_MANAGE_UNDEFINED(-2, "-", false, false),
+    STATUS_MANAGE_APPLY(1, "draft", false, false),
+    STATUS_MANAGE_ONLINE(5, "online", true, true),
+    STATUS_MANAGE_ONLINE_NOT_WRITE(6, "only-read", false, true),
+    STATUS_MANAGE_ONLINE_NOT_READ(7, "only-write", true, false),
+    STATUS_MANAGE_OFFLINE(9, "offline", false, false);
 
     private int code;
     private String description;
+    private boolean isAcceptPublish;
+    private boolean isAcceptSubscribe;
 
 
-    ManageStatus(int code, String description) {
+    ManageStatus(int code, String description,
+                 boolean isAcceptPublish,
+                 boolean isAcceptSubscribe) {
         this.code = code;
         this.description = description;
+        this.isAcceptPublish = isAcceptPublish;
+        this.isAcceptSubscribe = isAcceptSubscribe;
     }
 
     public boolean isOnlineStatus() {
@@ -53,6 +62,11 @@ public enum ManageStatus {
         return description;
     }
 
+    public Tuple2<Boolean, Boolean> getPubSubStatus() {
+        return new Tuple2<>(isAcceptPublish, isAcceptSubscribe);
+    }
+
+
     public static ManageStatus valueOf(int code) {
         for (ManageStatus status : ManageStatus.values()) {
             if (status.getCode() == code) {
@@ -62,4 +76,5 @@ public enum ManageStatus {
         throw new IllegalArgumentException(String.format(
                 "unknown broker manage status code %s", code));
     }
+
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index d3e79dc..9cdd0d3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -55,12 +55,13 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlac
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.TargetValidResult;
 import org.apache.tubemq.server.master.web.handler.BrokerProcessResult;
 import org.apache.tubemq.server.master.web.handler.GroupProcessResult;
+import org.apache.tubemq.server.master.web.handler.TopicProcessResult;
 import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -643,13 +644,13 @@ public class MetaDataManager implements Server {
         if (!metaStoreService.checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        Map<String, TopicDeployConfEntity> confEntityMap =
+        Map<String, TopicDeployEntity> confEntityMap =
                 metaStoreService.getConfiguredTopicInfo(brokerId);
         if (confEntityMap == null || confEntityMap.isEmpty()) {
             return result.isSuccess();
         }
         for (String topicName : rmvTopics) {
-            TopicDeployConfEntity topicEntity = confEntityMap.get(topicName);
+            TopicDeployEntity topicEntity = confEntityMap.get(topicName);
             if (topicEntity != null
                     && topicEntity.getTopicStatus() == TopicStatus.STATUS_TOPIC_SOFT_REMOVE) {
                 confDelTopicConfInfo(topicEntity.getModifyUser(),
@@ -679,12 +680,12 @@ public class MetaDataManager implements Server {
         if (!metaStoreService.checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        Map<String, TopicDeployConfEntity> confEntityMap =
+        Map<String, TopicDeployEntity> confEntityMap =
                 metaStoreService.getConfiguredTopicInfo(brokerId);
         if (confEntityMap == null || confEntityMap.isEmpty()) {
             return result.isSuccess();
         }
-        for (TopicDeployConfEntity topicEntity : confEntityMap.values()) {
+        for (TopicDeployEntity topicEntity : confEntityMap.values()) {
             if (topicEntity == null) {
                 continue;
             }
@@ -738,7 +739,7 @@ public class MetaDataManager implements Server {
         return metaStoreService.getBrokerConfByBrokerIp(brokerIp);
     }
 
-    public Map<String, TopicDeployConfEntity> getBrokerTopicConfEntitySet(int brokerId) {
+    public Map<String, TopicDeployEntity> getBrokerTopicConfEntitySet(int brokerId) {
         return metaStoreService.getConfiguredTopicInfo(brokerId);
     }
 
@@ -882,115 +883,357 @@ public class MetaDataManager implements Server {
 
     // ////////////////////////////////////////////////////////////////////////////
 
-    /**
-     * Get broker topic entity, if query entity is null, return all topic entity
-     *
-     * @param qryEntity query conditions
-     * @return topic entity map
-     */
-    public Map<String, List<TopicDeployConfEntity>> getTopicConfEntityMap(
-            TopicDeployConfEntity qryEntity) {
-        return metaStoreService.getTopicConfMap(qryEntity);
+    public List<TopicProcessResult> addTopicDeployInfo(long dataVerId, String createUsr,
+                                                       Date createDate, Set<Integer> brokerIdSet,
+                                                       Set<String> topicNameSet,
+                                                       TopicPropGroup topicPropInfo,
+                                                       StringBuilder sBuilder,
+                                                       ProcessResult result) {
+        TopicDeployEntity deployConf;
+        List<TopicProcessResult> retInfo = new ArrayList<>();
+        // add topic control info
+        addIfAbsentTopicCtrlConf(topicNameSet, createUsr, sBuilder, result);
+        result.clear();
+        // add topic deployment record
+        for (Integer brokerId : brokerIdSet) {
+            BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+            if (brokerConf == null) {
+                result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                        DataOpErrCode.DERR_NOT_EXIST.getDescription());
+                retInfo.add(new TopicProcessResult(brokerId, "", result));
+                continue;
+            }
+            for (String topicName : topicNameSet) {
+                TopicDeployEntity deployInfo = getTopicConfInfo(brokerId, topicName);
+                if (deployInfo != null) {
+                    result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                            DataOpErrCode.DERR_EXISTED.getDescription());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                deployConf = new TopicDeployEntity(dataVerId, createUsr, createDate);
+                deployConf.setTopicDeployInfo(brokerConf.getBrokerId(),
+                        brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
+                deployConf.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                        TBaseConstants.META_VALUE_UNDEFINED, null,
+                        TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
+                metaStoreService.addTopicConf(deployConf, sBuilder, result);
+                retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+            }
+        }
+        return retInfo;
     }
 
-    public Map<String, List<TopicDeployConfEntity>> getTopicConfMapByTopicAndBrokerIds(
-            Set<String> topicNameSet, Set<Integer> brokerIdSet) {
-        return metaStoreService.getTopicDepInfoByTopicBrokerId(topicNameSet, brokerIdSet);
+    public TopicProcessResult addTopicDeployInfo(TopicDeployEntity deployEntity,
+                                                 StringBuilder sBuilder,
+                                                 ProcessResult result) {
+        // add topic control info
+        addIfAbsentTopicCtrlConf(deployEntity.getTopicName(),
+                TBaseConstants.META_VALUE_UNDEFINED,
+                deployEntity.getCreateUser(), sBuilder, result);
+        BrokerConfEntity brokerConf = getBrokerConfByBrokerId(deployEntity.getBrokerId());
+        if (brokerConf == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    DataOpErrCode.DERR_NOT_EXIST.getDescription());
+            return new TopicProcessResult(deployEntity.getBrokerId(), "", result);
+        }
+        // add topic deployment record
+        TopicDeployEntity curDeployInfo =
+                metaStoreService.getTopicConfByeRecKey(deployEntity.getRecordKey());
+        if (curDeployInfo != null) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    DataOpErrCode.DERR_EXISTED.getDescription());
+            return new TopicProcessResult(deployEntity.getBrokerId(),
+                    deployEntity.getTopicName(), result);
+        }
+        metaStoreService.addTopicConf(deployEntity, sBuilder, result);
+        return new TopicProcessResult(deployEntity.getBrokerId(),
+                deployEntity.getTopicName(), result);
     }
 
     /**
-     * Add topic configure
+     * Modify topic config
      *
-     * @param entity     the topic control info entity will be add
-     * @param strBuffer  the print info string buffer
      * @param result     the process result return
      * @return true if success otherwise false
      */
-    public boolean confAddTopicConfig(TopicDeployConfEntity entity,
-                                      StringBuilder strBuffer,
-                                      ProcessResult result) {
-        BrokerConfEntity brkEntity =
-                metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId());
-        if (brkEntity == null) {
-            result.setFailResult(DataOpErrCode.DERR_CONDITION_LACK.getCode(),
-                    "Miss broker configure, please create broker configure first!");
-            return result.isSuccess();
+    public List<TopicProcessResult> modTopicConfig(long dataVerId, String modifyUser,
+                                                   Date modifyDate, Set<Integer> brokerIdSet,
+                                                   Set<String> topicNameSet,
+                                                   TopicPropGroup topicProps,
+                                                   StringBuilder sBuilder,
+                                                   ProcessResult result) {
+        List<TopicProcessResult> retInfo = new ArrayList<>();
+        // add topic control info
+        addIfAbsentTopicCtrlConf(topicNameSet, modifyUser, sBuilder, result);
+        result.clear();
+        // add topic deployment record
+        TopicDeployEntity curEntity;
+        TopicDeployEntity newEntity;
+        for (Integer brokerId : brokerIdSet) {
+            BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+            if (brokerConf == null) {
+                result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                        DataOpErrCode.DERR_NOT_EXIST.getDescription());
+                retInfo.add(new TopicProcessResult(brokerId, "", result));
+                continue;
+            }
+            for (String topicName : topicNameSet) {
+                curEntity = getTopicConfInfo(brokerId, topicName);
+                if (curEntity == null) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            DataOpErrCode.DERR_NOT_EXIST.getDescription());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                if (!curEntity.isValidTopicStatus()) {
+                    result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                            sBuilder.append("Topic of ").append(topicName)
+                                    .append("is deleted softly in brokerId=").append(brokerId)
+                                    .append(", please resume the record or hard removed first!")
+                                    .toString());
+                    sBuilder.delete(0, sBuilder.length());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                if (topicProps != null) {
+                    if (topicProps.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
+                            && topicProps.getNumPartitions() < curEntity.getNumPartitions()) {
+                        result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+                                sBuilder.append("Partition value is less than before,")
+                                        .append("please confirm the configure first! brokerId=")
+                                        .append(curEntity.getBrokerId()).append(", topicName=")
+                                        .append(curEntity.getTopicName())
+                                        .append(", old Partition value is ")
+                                        .append(curEntity.getNumPartitions())
+                                        .append(", new Partition value is ")
+                                        .append(topicProps.getNumPartitions()).toString());
+                        sBuilder.delete(0, sBuilder.length());
+                        retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                        continue;
+                    }
+                    if (topicProps.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
+                            && topicProps.getNumTopicStores() < curEntity.getNumTopicStores()) {
+                        result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+                                sBuilder.append("TopicStores value is less than before,")
+                                        .append("please confirm the configure first! brokerId=")
+                                        .append(curEntity.getBrokerId()).append(", topicName=")
+                                        .append(curEntity.getTopicName())
+                                        .append(", old TopicStores value is ")
+                                        .append(curEntity.getNumTopicStores())
+                                        .append(", new TopicStores value is ")
+                                        .append(topicProps.getNumTopicStores()).toString());
+                        sBuilder.delete(0, sBuilder.length());
+                        retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                        continue;
+                    }
+                }
+                newEntity = curEntity.clone();
+                newEntity.updBaseModifyInfo(dataVerId,
+                        null, null, modifyUser, modifyDate, null);
+                if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                        TBaseConstants.META_VALUE_UNDEFINED, null, null, topicProps)) {
+                    result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                            sBuilder.append("Data not changed for brokerId=")
+                                    .append(curEntity.getBrokerId()).append(", topicName=")
+                                    .append(curEntity.getTopicName()).toString());
+                    sBuilder.delete(0, sBuilder.length());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                metaStoreService.updTopicConf(newEntity, sBuilder, result);
+                retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+            }
         }
-        if (metaStoreService.addTopicConf(entity, result)) {
-            strBuffer.append("[confAddTopicConfig], ")
-                    .append(entity.getCreateUser())
-                    .append(" added topic configure record :")
-                    .append(entity.toString());
-            logger.info(strBuffer.toString());
-        } else {
-            strBuffer.append("[confAddTopicConfig], ")
-                    .append("failure to add topic configure record : ")
-                    .append(result.getErrInfo());
-            logger.warn(strBuffer.toString());
+        return retInfo;
+    }
+
+    /**
+     * Modify topic config
+     *
+     * @param result     the process result return
+     * @return true if success otherwise false
+     */
+    public List<TopicProcessResult> modDelOrRmvTopicConf(long dataVerId, String modifyUser,
+                                                         Date modifyDate, Set<Integer> brokerIdSet,
+                                                         Set<String> topicNameSet,
+                                                         TopicStatus topicStatus,
+                                                         StringBuilder sBuilder,
+                                                         ProcessResult result) {
+        TopicDeployEntity curEntity;
+        TopicDeployEntity newEntity;
+        List<TopicProcessResult> retInfo = new ArrayList<>();
+        // add topic deployment record
+        for (Integer brokerId : brokerIdSet) {
+            BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+            if (brokerConf == null) {
+                result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                        DataOpErrCode.DERR_NOT_EXIST.getDescription());
+                retInfo.add(new TopicProcessResult(brokerId, "", result));
+                continue;
+            }
+            for (String topicName : topicNameSet) {
+                curEntity = getTopicConfInfo(brokerId, topicName);
+                if (curEntity == null) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            DataOpErrCode.DERR_NOT_EXIST.getDescription());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                if (curEntity.isAcceptPublish()
+                        || curEntity.isAcceptSubscribe()) {  // still accept publish and subscribe
+                    result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                            sBuilder.append("The topic ").append(topicName)
+                                    .append("'s acceptPublish and acceptSubscribe status must be false in broker=")
+                                    .append(brokerId).append(" before topic deleted!").toString());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                if ((topicStatus == TopicStatus.STATUS_TOPIC_SOFT_DELETE
+                        && !curEntity.isValidTopicStatus())
+                        || (topicStatus == TopicStatus.STATUS_TOPIC_SOFT_REMOVE
+                        && curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE)) {
+                    result.setSuccResult("");
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                newEntity = curEntity.clone();
+                newEntity.updBaseModifyInfo(dataVerId,
+                        null, null, modifyUser, modifyDate, null);
+                if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                        TBaseConstants.META_VALUE_UNDEFINED, null, topicStatus, null)) {
+                    result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                            sBuilder.append("Data not changed for brokerId=")
+                                    .append(curEntity.getBrokerId()).append(", topicName=")
+                                    .append(curEntity.getTopicName()).toString());
+                    sBuilder.delete(0, sBuilder.length());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                metaStoreService.updTopicConf(newEntity, sBuilder, result);
+                retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+            }
         }
-        strBuffer.delete(0, strBuffer.length());
-        // add topic control record
-        addIfAbsentTopicCtrlConf(entity.getTopicName(), entity.getTopicId(),
-                entity.getCreateUser(), strBuffer, new ProcessResult());
-        return result.isSuccess();
+        return retInfo;
     }
 
     /**
      * Modify topic config
      *
-     * @param entity     the topic control info entity will be update
-     * @param strBuffer  the print info string buffer
      * @param result     the process result return
      * @return true if success otherwise false
      */
-    public boolean confModTopicConfig(TopicDeployConfEntity entity,
-                                      StringBuilder strBuffer,
-                                      ProcessResult result) {
-        if (metaStoreService.updTopicConf(entity, result)) {
-            TopicDeployConfEntity oldEntity =
-                    (TopicDeployConfEntity) result.getRetData();
-            TopicDeployConfEntity curEntity =
-                    metaStoreService.getTopicConfByeRecKey(entity.getRecordKey());
-            strBuffer.append("[confModTopicConfig], ")
-                    .append(entity.getModifyUser())
-                    .append(" updated record from :")
-                    .append(oldEntity.toString())
-                    .append(" to ").append(curEntity.toString());
-            logger.info(strBuffer.toString());
-        } else {
-            strBuffer.append("[confModTopicConfig], ")
-                    .append("failure to update topic configure record : ")
-                    .append(result.getErrInfo());
-            logger.warn(strBuffer.toString());
+    public List<TopicProcessResult> modRedoDelTopicConf(long dataVerId, String modifyUser,
+                                                        Date modifyDate, Set<Integer> brokerIdSet,
+                                                        Set<String> topicNameSet,
+                                                        StringBuilder sBuilder,
+                                                        ProcessResult result) {
+        TopicDeployEntity curEntity;
+        TopicDeployEntity newEntity;
+        List<TopicProcessResult> retInfo = new ArrayList<>();
+        // add topic deployment record
+        for (Integer brokerId : brokerIdSet) {
+            BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+            if (brokerConf == null) {
+                result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                        DataOpErrCode.DERR_NOT_EXIST.getDescription());
+                retInfo.add(new TopicProcessResult(brokerId, "", result));
+                continue;
+            }
+            for (String topicName : topicNameSet) {
+                curEntity = getTopicConfInfo(brokerId, topicName);
+                if (curEntity == null) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            DataOpErrCode.DERR_NOT_EXIST.getDescription());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                if (curEntity.isAcceptPublish()
+                        || curEntity.isAcceptSubscribe()) {  // still accept publish and subscribe
+                    result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                            sBuilder.append("The topic ").append(topicName)
+                                    .append("'s acceptPublish and acceptSubscribe status must be false in broker=")
+                                    .append(brokerId).append(" before topic deleted!").toString());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                if (curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE) {
+                    if (curEntity.isValidTopicStatus()) {
+                        result.setSuccResult(null);
+                    } else {
+                        result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                                sBuilder.append("Topic of ").append(topicName)
+                                        .append("is in removing flow in brokerId=")
+                                        .append(curEntity.getBrokerId())
+                                        .append(", please wait until remove process finished!")
+                                        .toString());
+                        sBuilder.delete(0, sBuilder.length());
+                    }
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                newEntity = curEntity.clone();
+                newEntity.updBaseModifyInfo(dataVerId,
+                        null, null, modifyUser, modifyDate, null);
+                if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                        TBaseConstants.META_VALUE_UNDEFINED, null,
+                        TopicStatus.STATUS_TOPIC_OK, null)) {
+                    result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                            sBuilder.append("Data not changed for brokerId=")
+                                    .append(curEntity.getBrokerId()).append(", topicName=")
+                                    .append(curEntity.getTopicName()).toString());
+                    sBuilder.delete(0, sBuilder.length());
+                    retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+                    continue;
+                }
+                metaStoreService.updTopicConf(newEntity, sBuilder, result);
+                retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+            }
         }
-        strBuffer.delete(0, strBuffer.length());
-        // add topic control record
-        addIfAbsentTopicCtrlConf(entity.getTopicName(), entity.getTopicId(),
-                entity.getCreateUser(), strBuffer, new ProcessResult());
-        return result.isSuccess();
+        return retInfo;
+    }
+
+
+    /**
+     * Get broker topic entity, if query entity is null, return all topic entity
+     *
+     * @param qryEntity query conditions
+     * @return topic entity map
+     */
+    public Map<String, List<TopicDeployEntity>> getTopicConfEntityMap(Set<String> topicNameSet,
+                                                                      Set<Integer> brokerIdSet,
+                                                                      TopicDeployEntity qryEntity) {
+        return metaStoreService.getTopicConfMap(topicNameSet, brokerIdSet, qryEntity);
+    }
+
+    public TopicDeployEntity getTopicConfInfo(int brokerId, String topicName) {
+        return metaStoreService.getTopicConf(brokerId, topicName);
+    }
+
+    /**
+     * Get broker topic entity, if query entity is null, return all topic entity
+     *
+     * @return topic entity map
+     */
+    public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                                       Set<Integer> brokerIdSet) {
+        return metaStoreService.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+    }
+
+    public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+        return metaStoreService.getTopicDepInfoByTopicBrokerId(topicNameSet, brokerIdSet);
     }
 
+
+
+
     private boolean confDelTopicConfInfo(String operator,
                                          String recordKey,
                                          StringBuilder strBuffer,
                                          ProcessResult result) {
-        if (metaStoreService.delTopicConf(recordKey, result)) {
-            GroupResCtrlEntity entity =
-                    (GroupResCtrlEntity) result.getRetData();
-            if (entity != null) {
-                strBuffer.append("[confDelTopicConfInfo], ").append(operator)
-                        .append(" deleted topic configure record :")
-                        .append(entity.toString());
-                logger.info(strBuffer.toString());
-            }
-        } else {
-            strBuffer.append("[confDelTopicConfInfo], ")
-                    .append("failure to delete topic configure record : ")
-                    .append(result.getErrInfo());
-            logger.warn(strBuffer.toString());
-        }
-        strBuffer.delete(0, strBuffer.length());
-        return result.isSuccess();
+        return metaStoreService.delTopicConf(operator,
+                recordKey, strBuffer, result);
     }
 
     public List<String> getBrokerTopicStrConfigInfo(
@@ -1006,7 +1249,7 @@ public class MetaDataManager implements Server {
     private List<String> inGetTopicConfStrInfo(BrokerConfEntity brokerEntity,
                                                boolean isRemoved, StringBuilder sBuffer) {
         List<String> topicConfStrs = new ArrayList<>();
-        Map<String, TopicDeployConfEntity> topicEntityMap =
+        Map<String, TopicDeployEntity> topicEntityMap =
                 metaStoreService.getConfiguredTopicInfo(brokerEntity.getBrokerId());
         if (topicEntityMap.isEmpty()) {
             return topicConfStrs;
@@ -1015,7 +1258,7 @@ public class MetaDataManager implements Server {
         ClusterSettingEntity clusterDefConf =
                 metaStoreService.getClusterConfig();
         int defMsgSizeInB = clusterDefConf.getMaxMsgSizeInB();
-        for (TopicDeployConfEntity topicEntity : topicEntityMap.values()) {
+        for (TopicDeployEntity topicEntity : topicEntityMap.values()) {
             /*
              * topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:
              * deletePolicy:filterStatusId:statusId
@@ -1114,19 +1357,7 @@ public class MetaDataManager implements Server {
     public boolean confAddTopicCtrlConf(TopicCtrlEntity entity,
                                         StringBuilder strBuffer,
                                         ProcessResult result) {
-        if (metaStoreService.addTopicCtrlConf(entity, result)) {
-            strBuffer.append("[confAddTopicCtrlConf], ")
-                    .append(entity.getCreateUser())
-                    .append(" added topic control record :")
-                    .append(entity.toString());
-            logger.info(strBuffer.toString());
-        } else {
-            strBuffer.append("[confAddTopicCtrlConf], ")
-                    .append("failure to add topic control record : ")
-                    .append(result.getErrInfo());
-            logger.warn(strBuffer.toString());
-        }
-        strBuffer.delete(0, strBuffer.length());
+        metaStoreService.addTopicCtrlConf(entity, strBuffer, result);
         return result.isSuccess();
     }
 
@@ -1138,34 +1369,57 @@ public class MetaDataManager implements Server {
      * @param operator   operator
      * @param strBuffer  the print info string buffer
      */
-    private void addIfAbsentTopicCtrlConf(String topicName,
-                                          int topicNameId,
-                                          String operator,
-                                          StringBuilder strBuffer,
-                                          ProcessResult result) {
+    public void addIfAbsentTopicCtrlConf(String topicName,
+                                         int topicNameId,
+                                         String operator,
+                                         StringBuilder strBuffer,
+                                         ProcessResult result) {
         TopicCtrlEntity curEntity =
                 metaStoreService.getTopicCtrlConf(topicName);
         if (curEntity != null) {
             return;
         }
-        curEntity = new TopicCtrlEntity(topicName, topicNameId, operator);
-        if (metaStoreService.addTopicCtrlConf(curEntity, result)) {
-            strBuffer.append("[addIfAbsentTopicCtrlConf], ")
-                    .append(curEntity.getCreateUser())
-                    .append(" added topic control record :")
-                    .append(curEntity.toString());
-            logger.info(strBuffer.toString());
-        } else {
-            strBuffer.append("[addIfAbsentTopicCtrlConf], ")
-                    .append("failure to add topic control record : ")
-                    .append(result.getErrInfo());
-            logger.warn(strBuffer.toString());
+        int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+        ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
+        if (defSetting != null) {
+            maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
         }
-        strBuffer.delete(0, strBuffer.length());
+        curEntity = new TopicCtrlEntity(topicName, topicNameId, maxMsgSizeInMB, operator);
+        metaStoreService.addTopicCtrlConf(curEntity, strBuffer, result);
         return;
     }
 
     /**
+     * Add if absent topic control configure info
+     *
+     * @param topicNameSet  the topic name will be add
+     * @param operator the topic name id will be add
+     * @param operator   operator
+     * @param strBuffer  the print info string buffer
+     */
+    public void addIfAbsentTopicCtrlConf(Set<String> topicNameSet,
+                                         String operator,
+                                         StringBuilder strBuffer,
+                                         ProcessResult result) {
+        TopicCtrlEntity curEntity;
+        int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+        ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
+        if (defSetting != null) {
+            maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+        }
+        for (String topicName : topicNameSet) {
+            result.clear();
+            curEntity = metaStoreService.getTopicCtrlConf(topicName);
+            if (curEntity != null) {
+                continue;
+            }
+            curEntity = new TopicCtrlEntity(topicName,
+                    TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB, operator);
+            metaStoreService.addTopicCtrlConf(curEntity, strBuffer, result);
+        }
+    }
+
+    /**
      * Update topic control configure
      *
      * @param entity     the topic control info entity will be update
@@ -1231,6 +1485,20 @@ public class MetaDataManager implements Server {
         return this.metaStoreService.getTopicCtrlConf(topicName);
     }
 
+    public int getTopicMaxMsgSizeInMB(String topicName) {
+        // get maxMsgSizeInMB info
+        int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+        ClusterSettingEntity clusterSettingEntity = getClusterDefSetting();
+        if (clusterSettingEntity != null) {
+            maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
+        }
+        TopicCtrlEntity topicCtrlEntity = getTopicCtrlByTopicName(topicName);
+        if (topicCtrlEntity != null) {
+            maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+        }
+        return maxMsgSizeInMB;
+    }
+
     /**
      * Get topic control entity list
      *
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 0878e30..2aa4a57 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -68,21 +68,21 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlac
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.ClusterConfigMapper;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupBlackListMapper;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupResCtrlMapper;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicCtrlMapper;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployConfigMapper;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
 import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbBrokerConfigMapperImpl;
 import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbClusterConfigMapperImpl;
 import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupBlackListMapperImpl;
 import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupConsumeCtrlMapperImpl;
 import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupResCtrlMapperImpl;
 import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbTopicCtrlMapperImpl;
-import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbTopicDeployConfigMapperImpl;
+import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbTopicDeployMapperImpl;
 import org.apache.tubemq.server.master.utils.BdbStoreSamplePrint;
 import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
 import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -143,7 +143,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
     // broker configure
     private BrokerConfigMapper brokerConfigMapper;
     // topic configure
-    private TopicDeployConfigMapper topicDeployConfigMapper;
+    private TopicDeployMapper topicDeployMapper;
     // topic control configure
     private TopicCtrlMapper topicCtrlMapper;
     // group configure
@@ -205,7 +205,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         logger.info("[BDB Impl] Stopping StoreManagerService...");
         // close bdb configure
         brokerConfigMapper.close();
-        topicDeployConfigMapper.close();
+        topicDeployMapper.close();
         groupResCtrlMapper.close();
         topicCtrlMapper.close();
         groupBlackListMapper.close();
@@ -413,30 +413,84 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
 
     // topic configure api
     @Override
-    public boolean addTopicConf(TopicDeployConfEntity entity, ProcessResult result) {
+    public boolean addTopicConf(TopicDeployEntity entity,
+                                StringBuilder strBuffer,
+                                ProcessResult result) {
         // check current status
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        return topicDeployConfigMapper.addTopicConf(entity, result);
+        if (topicDeployMapper.addTopicConf(entity, result)) {
+            strBuffer.append("[addTopicConf], ")
+                    .append(entity.getCreateUser())
+                    .append(" added topic configure record :")
+                    .append(entity.toString());
+            logger.info(strBuffer.toString());
+        } else {
+            strBuffer.append("[addTopicConf], ")
+                    .append("failure to add topic configure record : ")
+                    .append(result.getErrInfo());
+            logger.warn(strBuffer.toString());
+        }
+        strBuffer.delete(0, strBuffer.length());
+        return result.isSuccess();
     }
 
     @Override
-    public boolean updTopicConf(TopicDeployConfEntity entity, ProcessResult result) {
+    public boolean updTopicConf(TopicDeployEntity entity,
+                                StringBuilder strBuffer,
+                                ProcessResult result) {
         // check current status
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        return topicDeployConfigMapper.updTopicConf(entity, result);
+        if (topicDeployMapper.updTopicConf(entity, result)) {
+            TopicDeployEntity oldEntity =
+                    (TopicDeployEntity) result.getRetData();
+            TopicDeployEntity curEntity =
+                    topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+            strBuffer.append("[updTopicConf], ")
+                    .append(entity.getModifyUser())
+                    .append(" updated record from :")
+                    .append(oldEntity.toString())
+                    .append(" to ").append(curEntity.toString());
+            logger.info(strBuffer.toString());
+        } else {
+            strBuffer.append("[updTopicConf], ")
+                    .append("failure to update topic configure record : ")
+                    .append(result.getErrInfo());
+            logger.warn(strBuffer.toString());
+        }
+        strBuffer.delete(0, strBuffer.length());
+        return result.isSuccess();
     }
 
     @Override
-    public boolean delTopicConf(String recordKey, ProcessResult result) {
+    public boolean delTopicConf(String operator,
+                                String recordKey,
+                                StringBuilder strBuffer,
+                                ProcessResult result) {
         // check current status
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        return topicDeployConfigMapper.delTopicConf(recordKey, result);
+        if (topicDeployMapper.delTopicConf(recordKey, result)) {
+            GroupResCtrlEntity entity =
+                    (GroupResCtrlEntity) result.getRetData();
+            if (entity != null) {
+                strBuffer.append("[delTopicConf], ").append(operator)
+                        .append(" deleted topic configure record :")
+                        .append(entity.toString());
+                logger.info(strBuffer.toString());
+            }
+        } else {
+            strBuffer.append("[delTopicConf], ")
+                    .append("failure to delete topic configure record : ")
+                    .append(result.getErrInfo());
+            logger.warn(strBuffer.toString());
+        }
+        strBuffer.delete(0, strBuffer.length());
+        return result.isSuccess();
     }
 
     @Override
@@ -448,7 +502,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        if (topicDeployConfigMapper.delTopicConfByBrokerId(brokerId, result)) {
+        if (topicDeployMapper.delTopicConfByBrokerId(brokerId, result)) {
             strBuffer.append("[delTopicConfByBrokerId], ")
                     .append(operator)
                     .append(" deleted topic deploy record :")
@@ -466,59 +520,86 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
 
     @Override
     public boolean hasConfiguredTopics(int brokerId) {
-        return topicDeployConfigMapper.hasConfiguredTopics(brokerId);
+        return topicDeployMapper.hasConfiguredTopics(brokerId);
+    }
+
+    @Override
+    public TopicDeployEntity getTopicConfByeRecKey(String recordKey) {
+        return topicDeployMapper.getTopicConfByeRecKey(recordKey);
     }
 
     @Override
-    public TopicDeployConfEntity getTopicConfByeRecKey(String recordKey) {
-        return topicDeployConfigMapper.getTopicConfByeRecKey(recordKey);
+    public List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity) {
+        return topicDeployMapper.getTopicConf(qryEntity);
     }
 
     @Override
-    public List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity) {
-        return topicDeployConfigMapper.getTopicConf(qryEntity);
+    public TopicDeployEntity getTopicConf(int brokerId, String topicName) {
+        return topicDeployMapper.getTopicConf(brokerId, topicName);
     }
 
     @Override
     public Map<Integer, Set<String>> getConfiguredTopicInfo(Set<Integer> brokerIdSet) {
-        return topicDeployConfigMapper.getConfiguredTopicInfo(brokerIdSet);
+        return topicDeployMapper.getConfiguredTopicInfo(brokerIdSet);
     }
 
     @Override
     public Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet) {
-        return topicDeployConfigMapper.getTopicBrokerInfo(topicNameSet);
+        return topicDeployMapper.getTopicBrokerInfo(topicNameSet);
     }
 
     @Override
     public Set<String> getConfiguredTopicSet() {
-        return topicDeployConfigMapper.getConfiguredTopicSet();
+        return topicDeployMapper.getConfiguredTopicSet();
     }
 
     @Override
-    public Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMap(
-            TopicDeployConfEntity qryEntity) {
-        return topicDeployConfigMapper.getTopicConfMap(qryEntity);
+    public Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMap(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet, TopicDeployEntity qryEntity) {
+        return topicDeployMapper.getTopicConfMap(topicNameSet, brokerIdSet, qryEntity);
     }
 
     @Override
-    public Map<String, List<TopicDeployConfEntity>>getTopicDepInfoByTopicBrokerId(
+    public Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+        return topicDeployMapper.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+    }
+
+
+    @Override
+    public Map<String, List<TopicDeployEntity>>getTopicDepInfoByTopicBrokerId(
             Set<String> topicSet, Set<Integer> brokerIdSet) {
-        return topicDeployConfigMapper.getTopicConfMapByTopicAndBrokerIds(topicSet, brokerIdSet);
+        return topicDeployMapper.getTopicConfMapByTopicAndBrokerIds(topicSet, brokerIdSet);
     }
 
     @Override
-    public Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId) {
-        return topicDeployConfigMapper.getConfiguredTopicInfo(brokerId);
+    public Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId) {
+        return topicDeployMapper.getConfiguredTopicInfo(brokerId);
     }
 
     // topic control api
     @Override
-    public boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result) {
+    public boolean addTopicCtrlConf(TopicCtrlEntity entity,
+                                    StringBuilder sBuffer,
+                                    ProcessResult result) {
         // check current status
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        return topicCtrlMapper.addTopicCtrlConf(entity, result);
+        if (topicCtrlMapper.addTopicCtrlConf(entity, result)) {
+            sBuffer.append("[addTopicCtrlConf], ")
+                    .append(entity.getCreateUser())
+                    .append(" added topic control record :")
+                    .append(entity.toString());
+            logger.info(sBuffer.toString());
+        } else {
+            sBuffer.append("[addTopicCtrlConf], ")
+                    .append("failure to add topic control record : ")
+                    .append(result.getErrInfo());
+            logger.warn(sBuffer.toString());
+        }
+        sBuffer.delete(0, sBuffer.length());
+        return result.isSuccess();
     }
 
     @Override
@@ -1270,7 +1351,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
     private void initMetaStore() {
         clusterConfigMapper = new BdbClusterConfigMapperImpl(repEnv, storeConfig);
         brokerConfigMapper = new BdbBrokerConfigMapperImpl(repEnv, storeConfig);
-        topicDeployConfigMapper =  new BdbTopicDeployConfigMapperImpl(repEnv, storeConfig);
+        topicDeployMapper =  new BdbTopicDeployMapperImpl(repEnv, storeConfig);
         groupResCtrlMapper = new BdbGroupResCtrlMapperImpl(repEnv, storeConfig);
         topicCtrlMapper = new BdbTopicCtrlMapperImpl(repEnv, storeConfig);
         groupConsumeCtrlMapper = new BdbGroupConsumeCtrlMapperImpl(repEnv, storeConfig);
@@ -1282,7 +1363,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         clearCachedRunData();
         clusterConfigMapper.loadConfig();
         brokerConfigMapper.loadConfig();
-        topicDeployConfigMapper.loadConfig();
+        topicDeployMapper.loadConfig();
         topicCtrlMapper.loadConfig();
         groupResCtrlMapper.loadConfig();
         groupBlackListMapper.loadConfig();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index b765543..362baf1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -30,7 +30,7 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlac
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 
 
 public interface MetaStoreService extends KeepAlive, Server {
@@ -129,11 +129,18 @@ public interface MetaStoreService extends KeepAlive, Server {
     BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp);
 
     // topic configure api
-    boolean addTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+    boolean addTopicConf(TopicDeployEntity entity,
+                         StringBuilder strBuffer,
+                         ProcessResult result);
 
-    boolean updTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+    boolean updTopicConf(TopicDeployEntity entity,
+                         StringBuilder strBuffer,
+                         ProcessResult result);
 
-    boolean delTopicConf(String recordKey, ProcessResult result);
+    boolean delTopicConf(String operator,
+                         String recordKey,
+                         StringBuilder strBuffer,
+                         ProcessResult result);
 
     boolean delTopicConfByBrokerId(String operator,
                                    int brokerId,
@@ -142,9 +149,11 @@ public interface MetaStoreService extends KeepAlive, Server {
 
     boolean hasConfiguredTopics(int brokerId);
 
-    TopicDeployConfEntity getTopicConfByeRecKey(String recordKey);
+    TopicDeployEntity getTopicConfByeRecKey(String recordKey);
 
-    List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity);
+    List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity);
+
+    TopicDeployEntity getTopicConf(int brokerId, String topicName);
 
     Set<String> getConfiguredTopicSet();
 
@@ -152,16 +161,21 @@ public interface MetaStoreService extends KeepAlive, Server {
 
     Map<String/* topicName */, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet);
 
-    Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMap(
-            TopicDeployConfEntity qryEntity);
+    Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMap(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet, TopicDeployEntity qryEntity);
+
+    Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet);
 
-    Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicDepInfoByTopicBrokerId(
+    Map<String/* topicName */, List<TopicDeployEntity>> getTopicDepInfoByTopicBrokerId(
             Set<String> topicSet, Set<Integer> brokerIdSet);
 
-    Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId);
+    Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId);
 
     // topic control api
-    boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+    boolean addTopicCtrlConf(TopicCtrlEntity entity,
+                             StringBuilder sBuffer,
+                             ProcessResult result);
 
     boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index d861511..7beb57e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -45,6 +45,7 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
     private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED;
     private TopicPropGroup clsDefTopicProps = new TopicPropGroup();
     private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
+    private int maxMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED;
     private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
     private EnableStatus gloFlowCtrlStatus = EnableStatus.STATUS_UNDEFINE;
     // flow control rule count
@@ -77,6 +78,8 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
                         bdbEntity.getDeletePolicy(), bdbEntity.getDefDataType(),
                         bdbEntity.getDefDataPath());
         this.maxMsgSizeInB = bdbEntity.getMaxMsgSizeInB();
+        this.maxMsgSizeInMB =
+                this.maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE;
         this.qryPriorityId = bdbEntity.getQryPriorityId();
         setEnableFlowCtrl(bdbEntity.getEnableGloFlowCtrl());
         setGloFlowCtrlInfo(bdbEntity.getGloFlowCtrlCnt(), bdbEntity.getGloFlowCtrlInfo());
@@ -112,9 +115,9 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
         this.brokerPort = TBaseConstants.META_DEFAULT_BROKER_PORT;
         this.brokerTLSPort = TBaseConstants.META_DEFAULT_BROKER_TLS_PORT;
         this.brokerWebPort = TBaseConstants.META_DEFAULT_BROKER_WEB_PORT;
+        this.maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
         this.maxMsgSizeInB =
-                SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(
-                        TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
+                SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(this.maxMsgSizeInMB);
         this.qryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE;
         this.gloFlowCtrlStatus = EnableStatus.STATUS_DISABLE;
         this.gloFlowCtrlRuleCnt = 0;
@@ -151,10 +154,12 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
         }
         // check and set modified field
         if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
-            newMaxMsgSizeMB = SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(newMaxMsgSizeMB);
-            if (this.maxMsgSizeInB != newMaxMsgSizeMB) {
+            int newMaxMsgSizeB =
+                    SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(newMaxMsgSizeMB);
+            if (this.maxMsgSizeInB != newMaxMsgSizeB) {
                 changed = true;
-                this.maxMsgSizeInB = newMaxMsgSizeMB;
+                this.maxMsgSizeInB = newMaxMsgSizeB;
+                this.maxMsgSizeInMB = newMaxMsgSizeMB;
             }
         }
         // check and set qry priority id
@@ -206,6 +211,10 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
         return maxMsgSizeInB;
     }
 
+    public int getMaxMsgSizeInMB() {
+        return maxMsgSizeInMB;
+    }
+
     public int getQryPriorityId() {
         return qryPriorityId;
     }
@@ -260,15 +269,11 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
     public StringBuilder toWebJsonStr(StringBuilder sBuilder,
                                       boolean isLongName,
                                       boolean fullFormat) {
-        int tmpMsgSizeInMB = maxMsgSizeInB;
-        if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
-            tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
-        }
         if (isLongName) {
             sBuilder.append("{\"brokerPort\":").append(brokerPort)
                     .append(",\"brokerTLSPort\":").append(brokerTLSPort)
                     .append(",\"brokerWebPort\":").append(brokerWebPort)
-                    .append(",\"maxMsgSizeInMB\":").append(tmpMsgSizeInMB)
+                    .append(",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
                     .append(",\"qryPriorityId\":").append(qryPriorityId)
                     .append(",\"flowCtrlEnable\":").append(gloFlowCtrlStatus.isEnable())
                     .append(",\"flowCtrlRuleCount\":").append(gloFlowCtrlRuleCnt)
@@ -277,7 +282,7 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
             sBuilder.append("{\"bPort\":").append(brokerPort)
                     .append(",\"bTlsPort\":").append(brokerTLSPort)
                     .append(",\"bWebPort\":").append(brokerWebPort)
-                    .append(",\"mxMsgInMB\":").append(tmpMsgSizeInMB)
+                    .append(",\"mxMsgInMB\":").append(maxMsgSizeInMB)
                     .append(",\"qryPriId\":").append(qryPriorityId)
                     .append(",\"fCtrlEn\":").append(gloFlowCtrlStatus.isEnable())
                     .append(",\"fCtrlCnt\":").append(gloFlowCtrlRuleCnt)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index 0bd8f9e..c4b80e4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -20,6 +20,7 @@ package org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
 import java.util.Date;
 import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.SettingValidUtils;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.statusdef.EnableStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
@@ -36,17 +37,22 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
     private int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
     private EnableStatus authCtrlStatus = EnableStatus.STATUS_UNDEFINE;
     private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
+    private int maxMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED;
 
 
     public TopicCtrlEntity() {
         super();
     }
 
-    public TopicCtrlEntity(String topicName, int topicNameId, String createUser) {
+    public TopicCtrlEntity(String topicName, int topicNameId,
+                           int maxMsgSizeInMB, String createUser) {
         super(createUser, new Date());
         this.topicName = topicName;
         this.topicNameId = topicNameId;
         this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
+        this.maxMsgSizeInB =
+                SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
+        this.maxMsgSizeInMB = maxMsgSizeInMB;
     }
 
     public TopicCtrlEntity(String topicName, int topicNameId,
@@ -56,7 +62,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
         super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
         this.topicName = topicName;
         this.topicNameId = topicNameId;
-        this.maxMsgSizeInB = maxMsgSizeInB;
+        this.fillMaxMsgSize(maxMsgSizeInB);
         if (enableAuth) {
             this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
         } else {
@@ -69,7 +75,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
                 bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
         this.topicName = bdbEntity.getTopicName();
         this.topicNameId = bdbEntity.getTopicId();
-        this.maxMsgSizeInB = bdbEntity.getMaxMsgSize();
+        this.fillMaxMsgSize(maxMsgSizeInB);
         if (bdbEntity.isEnableAuthControl()) {
             this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
         } else {
@@ -120,8 +126,8 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
         return maxMsgSizeInB;
     }
 
-    public void setMaxMsgSizeInB(int maxMsgSizeInB) {
-        this.maxMsgSizeInB = maxMsgSizeInB;
+    public int getMaxMsgSizeInMB() {
+        return maxMsgSizeInMB;
     }
 
     public int getTopicId() {
@@ -133,6 +139,45 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
     }
 
     /**
+     * update subclass field values
+     *
+     * @return if changed
+     */
+    public boolean updModifyInfo(int topicNameId,
+                                 int newMaxMsgSizeMB,
+                                 EnableStatus authCtrlStatus) {
+        boolean changed = false;
+        // check and set topicNameId info
+        if (topicNameId != TBaseConstants.META_VALUE_UNDEFINED
+                && this.topicNameId != topicNameId) {
+            changed = true;
+            this.topicNameId = topicNameId;
+        }
+        // check and set modified field
+        if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
+            int newMaxMsgSizeB =
+                    SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(newMaxMsgSizeMB);
+            if (this.maxMsgSizeInB != newMaxMsgSizeB) {
+                changed = true;
+                this.maxMsgSizeInB = newMaxMsgSizeB;
+                this.maxMsgSizeInMB = newMaxMsgSizeMB;
+            }
+        }
+        // check and set authCtrlStatus info
+        if (authCtrlStatus != null
+                && authCtrlStatus != EnableStatus.STATUS_UNDEFINE
+                && this.authCtrlStatus != authCtrlStatus) {
+            changed = true;
+            this.authCtrlStatus = authCtrlStatus;
+        }
+
+        if (changed) {
+            updSerialId();
+        }
+        return changed;
+    }
+
+    /**
      * Check whether the specified query item value matches
      * Allowed query items:
      *   topicName, maxMsgSizeInB, authCtrlStatus
@@ -169,20 +214,16 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
     public StringBuilder toWebJsonStr(StringBuilder sBuilder,
                                       boolean isLongName,
                                       boolean fullFormat) {
-        int tmpMsgSizeInMB = maxMsgSizeInB;
-        if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
-            tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
-        }
         if (isLongName) {
             sBuilder.append("{\"topicName\":\"").append(topicName).append("\"")
                     .append(",\"topicNameId\":").append(topicNameId)
                     .append(",\"enableAuthControl\":").append(authCtrlStatus.isEnable())
-                    .append(",\"maxMsgSizeInMB\":").append(tmpMsgSizeInMB);
+                    .append(",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB);
         } else {
             sBuilder.append("{\"topic\":\"").append(topicName).append("\"")
                     .append(",\"topicId\":").append(topicNameId)
                     .append(",\"acEn\":").append(authCtrlStatus.isEnable())
-                    .append(",\"mxMsgInMB\":").append(tmpMsgSizeInMB);
+                    .append(",\"mxMsgInMB\":").append(maxMsgSizeInMB);
         }
         super.toWebJsonStr(sBuilder, isLongName);
         if (fullFormat) {
@@ -191,6 +232,12 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
         return sBuilder;
     }
 
+    private void fillMaxMsgSize(int maxMsgSizeInB) {
+        this.maxMsgSizeInB = maxMsgSizeInB;
+        this.maxMsgSizeInMB =
+                maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE;
+    }
+
     /**
      * check if subclass fields is equals
      *
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
similarity index 75%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 2802d35..e502854 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -30,7 +30,7 @@ import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
  * store the topic configure setting
  *
  */
-public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
+public class TopicDeployEntity extends BaseEntity implements Cloneable {
 
     private String recordKey = "";
     private String topicName = "";
@@ -44,28 +44,34 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
     private TopicPropGroup topicProps = new TopicPropGroup();
 
 
-    public TopicDeployConfEntity() {
+    public TopicDeployEntity() {
         super();
     }
 
-    public TopicDeployConfEntity(String topicName, int topicId, int brokerId,
-                                 String brokerIp, int brokerPort,
-                                 TopicPropGroup topicProps, TopicStatus deployStatus,
-                                 long dataVersionId, String createUser,
-                                 Date createDate, String modifyUser, Date modifyDate) {
+    public TopicDeployEntity(long dataVerId, String createUser, Date createDate) {
+        super(dataVerId, createUser, createDate);
+    }
+
+    public TopicDeployEntity(String topicName, int topicId, int brokerId,
+                             String brokerIp, int brokerPort,
+                             TopicPropGroup topicProps, TopicStatus deployStatus,
+                             long dataVersionId, String createUser,
+                             Date createDate, String modifyUser, Date modifyDate) {
         super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
-        setTopicDeployInfo(brokerId, brokerIp, brokerPort, topicName, topicId);
+        setTopicDeployInfo(brokerId, brokerIp, brokerPort, topicName);
+        this.topicNameId = topicId;
         this.deployStatus = deployStatus;
         this.topicProps = topicProps;
     }
 
-    public TopicDeployConfEntity(BdbTopicConfEntity bdbEntity) {
+    public TopicDeployEntity(BdbTopicConfEntity bdbEntity) {
         super(bdbEntity.getDataVerId(),
                 bdbEntity.getCreateUser(), bdbEntity.getCreateDate(),
                 bdbEntity.getModifyUser(), bdbEntity.getModifyDate());
         setTopicDeployInfo(bdbEntity.getBrokerId(),
                 bdbEntity.getBrokerIp(), bdbEntity.getBrokerPort(),
-                bdbEntity.getTopicName(), bdbEntity.getTopicId());
+                bdbEntity.getTopicName());
+        this.topicNameId = bdbEntity.getTopicId();
         this.deployStatus = TopicStatus.valueOf(bdbEntity.getTopicStatusId());
         this.topicProps =
                 new TopicPropGroup(bdbEntity.getNumTopicStores(), bdbEntity.getNumPartitions(),
@@ -97,17 +103,17 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
         return bdbEntity;
     }
 
-    public void setTopicDeployInfo(int brokerId, String brokerIp, int brokerPort,
-                                   String topicName, int topicId) {
+    public void setTopicDeployInfo(int brokerId, String brokerIp,
+                                   int brokerPort, String topicName) {
         this.brokerId = brokerId;
         this.brokerIp = brokerIp;
         this.brokerPort = brokerPort;
         this.topicName = topicName;
-        this.topicNameId = topicId;
         this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
         this.brokerAddress = KeyBuilderUtils.buildAddressInfo(brokerIp, brokerPort);
     }
 
+
     public String getRecordKey() {
         return recordKey;
     }
@@ -136,6 +142,22 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
         this.topicProps = topicProps;
     }
 
+    public int getNumTopicStores() {
+        return this.topicProps.getNumTopicStores();
+    }
+
+    public int getNumPartitions() {
+        return this.topicProps.getNumPartitions();
+    }
+
+    public boolean isAcceptPublish() {
+        return this.topicProps.isAcceptPublish();
+    }
+
+    public boolean isAcceptSubscribe() {
+        return this.topicProps.isAcceptSubscribe();
+    }
+
     public int getTopicId() {
         return topicNameId;
     }
@@ -178,12 +200,59 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
     }
 
     /**
+     * update subclass field values
+     *
+     * @return if changed
+     */
+    public boolean updModifyInfo(int topicNameId, int brokerPort, String brokerIp,
+                                 TopicStatus deployStatus, TopicPropGroup topicProps) {
+        boolean changed = false;
+        // check and set topicNameId info
+        if (topicNameId != TBaseConstants.META_VALUE_UNDEFINED
+                && this.topicNameId != topicNameId) {
+            changed = true;
+            this.topicNameId = topicNameId;
+        }
+        // check and set brokerPort info
+        if (brokerPort != TBaseConstants.META_VALUE_UNDEFINED
+                && this.brokerPort != brokerPort) {
+            changed = true;
+            this.brokerPort = brokerPort;
+        }
+        // check and set filterCondStr info
+        if (TStringUtils.isNotBlank(brokerIp)
+                && !this.brokerIp.equals(brokerIp)) {
+            changed = true;
+            this.brokerIp = brokerIp;
+        }
+        // check and set deployStatus info
+        if (deployStatus != null
+                && deployStatus != TopicStatus.STATUS_TOPIC_UNDEFINED
+                && this.deployStatus != deployStatus) {
+            changed = true;
+            this.deployStatus = deployStatus;
+        }
+        // check and set topicProps info
+        if (topicProps != null
+                && !topicProps.isDataEquals(this.topicProps)) {
+            changed = true;
+            this.topicProps = topicProps;
+        }
+        if (changed) {
+            updSerialId();
+            this.brokerAddress =
+                    KeyBuilderUtils.buildAddressInfo(this.brokerIp, this.brokerPort);
+        }
+        return changed;
+    }
+
+    /**
      * Check whether the specified query item value matches
      * Allowed query items:
      *   brokerId, topicId, topicName, topicStatus
      * @return true: matched, false: not match
      */
-    public boolean isMatched(TopicDeployConfEntity target) {
+    public boolean isMatched(TopicDeployEntity target) {
         if (target == null) {
             return true;
         }
@@ -246,7 +315,7 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
      * @param other  check object
      * @return if equals
      */
-    public boolean isDataEquals(TopicDeployConfEntity other) {
+    public boolean isDataEquals(TopicDeployEntity other) {
         return brokerId == other.brokerId
                 && brokerPort == other.brokerPort
                 && topicNameId == other.topicNameId
@@ -263,13 +332,13 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
         if (this == o) {
             return true;
         }
-        if (!(o instanceof TopicDeployConfEntity)) {
+        if (!(o instanceof TopicDeployEntity)) {
             return false;
         }
         if (!super.equals(o)) {
             return false;
         }
-        TopicDeployConfEntity that = (TopicDeployConfEntity) o;
+        TopicDeployEntity that = (TopicDeployEntity) o;
         return isDataEquals(that);
     }
 
@@ -280,9 +349,9 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
     }
 
     @Override
-    public TopicDeployConfEntity clone() {
+    public TopicDeployEntity clone() {
         try {
-            TopicDeployConfEntity copy = (TopicDeployConfEntity) super.clone();
+            TopicDeployEntity copy = (TopicDeployEntity) super.clone();
             if (copy.getTopicProps() != null) {
                 copy.setTopicProps(getTopicProps().clone());
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
similarity index 58%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 832ff2f..b4a1233 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -22,15 +22,15 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 
 
 
-public interface TopicDeployConfigMapper extends AbstractMapper {
+public interface TopicDeployMapper extends AbstractMapper {
 
-    boolean addTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+    boolean addTopicConf(TopicDeployEntity entity, ProcessResult result);
 
-    boolean updTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+    boolean updTopicConf(TopicDeployEntity entity, ProcessResult result);
 
     boolean delTopicConf(String recordKey, ProcessResult result);
 
@@ -39,14 +39,20 @@ public interface TopicDeployConfigMapper extends AbstractMapper {
 
     boolean hasConfiguredTopics(int brokerId);
 
-    List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity);
+    List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity);
 
-    TopicDeployConfEntity getTopicConfByeRecKey(String recordKey);
+    TopicDeployEntity getTopicConf(int brokerId, String topicName);
 
-    Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMap(
-            TopicDeployConfEntity qryEntity);
+    TopicDeployEntity getTopicConfByeRecKey(String recordKey);
 
-    Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMapByTopicAndBrokerIds(
+    Map<String, List<TopicDeployEntity>> getTopicConfMap(Set<String> topicNameSet,
+                                                         Set<Integer> brokerIdSet,
+                                                         TopicDeployEntity qryEntity);
+
+    Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                                Set<Integer> brokerIdSet);
+
+    Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
             Set<String> topicSet, Set<Integer> brokerIdSet);
 
     Map<Integer/* brokerId */, Set<String>> getConfiguredTopicInfo(Set<Integer> brokerIdSet);
@@ -55,5 +61,5 @@ public interface TopicDeployConfigMapper extends AbstractMapper {
 
     Set<String> getConfiguredTopicSet();
 
-    Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId);
+    Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
similarity index 72%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index 5b9dcae..5f6c6c8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -32,27 +32,28 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
+import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
 import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployConfigMapper;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 
-public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
+public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
 
     private static final Logger logger =
-            LoggerFactory.getLogger(BdbTopicDeployConfigMapperImpl.class);
+            LoggerFactory.getLogger(BdbTopicDeployMapperImpl.class);
 
     // Topic configure store
     private EntityStore topicConfStore;
     private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity> topicConfIndex;
     // data cache
-    private ConcurrentHashMap<String/* recordKey */, TopicDeployConfEntity> topicConfCache =
+    private ConcurrentHashMap<String/* recordKey */, TopicDeployEntity> topicConfCache =
             new ConcurrentHashMap<>();
     private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
             brokerIdCacheIndex = new ConcurrentHashMap<>();
@@ -65,7 +66,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
 
 
 
-    public BdbTopicDeployConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
+    public BdbTopicDeployMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
         topicConfStore = new EntityStore(repEnv,
                 TBDBStoreTables.BDB_TOPIC_CONFIG_STORE_NAME, storeConfig);
         topicConfIndex =
@@ -96,7 +97,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
                     logger.warn("[BDB Impl] found Null data while loading topic configure!");
                     continue;
                 }
-                TopicDeployConfEntity memEntity = new TopicDeployConfEntity(bdbEntity);
+                TopicDeployEntity memEntity = new TopicDeployEntity(bdbEntity);
                 addOrUpdCacheRecord(memEntity);
                 count++;
             }
@@ -113,8 +114,8 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
     }
 
     @Override
-    public boolean addTopicConf(TopicDeployConfEntity memEntity, ProcessResult result) {
-        TopicDeployConfEntity curEntity =
+    public boolean addTopicConf(TopicDeployEntity memEntity, ProcessResult result) {
+        TopicDeployEntity curEntity =
                 topicConfCache.get(memEntity.getRecordKey());
         if (curEntity != null) {
             result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
@@ -131,8 +132,8 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
     }
 
     @Override
-    public boolean updTopicConf(TopicDeployConfEntity memEntity, ProcessResult result) {
-        TopicDeployConfEntity curEntity =
+    public boolean updTopicConf(TopicDeployEntity memEntity, ProcessResult result) {
+        TopicDeployEntity curEntity =
                 topicConfCache.get(memEntity.getRecordKey());
         if (curEntity == null) {
             result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
@@ -159,7 +160,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
 
     @Override
     public boolean delTopicConf(String recordKey, ProcessResult result) {
-        TopicDeployConfEntity curEntity =
+        TopicDeployEntity curEntity =
                 topicConfCache.get(recordKey);
         if (curEntity == null) {
             result.setSuccResult(null);
@@ -195,17 +196,17 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
     }
 
     @Override
-    public TopicDeployConfEntity getTopicConfByeRecKey(String recordKey) {
+    public TopicDeployEntity getTopicConfByeRecKey(String recordKey) {
         return topicConfCache.get(recordKey);
     }
 
     @Override
-    public List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity) {
-        List<TopicDeployConfEntity> retEntitys = new ArrayList<>();
+    public List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity) {
+        List<TopicDeployEntity> retEntitys = new ArrayList<>();
         if (qryEntity == null) {
             retEntitys.addAll(topicConfCache.values());
         } else {
-            for (TopicDeployConfEntity entity : topicConfCache.values()) {
+            for (TopicDeployEntity entity : topicConfCache.values()) {
                 if (entity.isMatched(qryEntity)) {
                     retEntitys.add(entity);
                 }
@@ -215,44 +216,121 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
     }
 
     @Override
-    public Map<String, List<TopicDeployConfEntity>> getTopicConfMap(TopicDeployConfEntity qryEntity) {
-        List<TopicDeployConfEntity> items;
-        Map<String, List<TopicDeployConfEntity>> retEntityMap = new HashMap<>();
-        if (qryEntity == null) {
-            for (TopicDeployConfEntity entity : topicConfCache.values()) {
-                items = retEntityMap.get(entity.getTopicName());
-                if (items == null) {
-                    items = new ArrayList<>();
-                    retEntityMap.put(entity.getTopicName(), items);
+    public TopicDeployEntity getTopicConf(int brokerId, String topicName) {
+        String recordKey =
+                KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
+        return topicConfCache.get(recordKey);
+    }
+
+    @Override
+    public Map<String, List<TopicDeployEntity>> getTopicConfMap(Set<String> topicNameSet,
+                                                                Set<Integer> brokerIdSet,
+                                                                TopicDeployEntity qryEntity) {
+        List<TopicDeployEntity> items;
+        Set<String> qryTopicKey = null;
+        ConcurrentHashSet<String> keySet;
+        Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
+        if (topicNameSet != null && !topicNameSet.isEmpty()) {
+            qryTopicKey = new HashSet<>();
+            for (String topicName : topicNameSet) {
+                keySet = topicNameCacheIndex.get(topicName);
+                if (keySet != null && !keySet.isEmpty()) {
+                    qryTopicKey.addAll(keySet);
                 }
-                items.add(entity);
             }
-        } else {
-            for (TopicDeployConfEntity entity : topicConfCache.values()) {
-                if (entity.isMatched(qryEntity)) {
-                    items = retEntityMap.get(entity.getTopicName());
-                    if (items == null) {
-                        items = new ArrayList<>();
-                        retEntityMap.put(entity.getTopicName(), items);
-                    }
-                    items.add(entity);
+        }
+        if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+            if (qryTopicKey == null) {
+                qryTopicKey = new HashSet<>();
+            }
+            for (Integer brokerId : brokerIdSet) {
+                keySet = brokerIdCacheIndex.get(brokerId);
+                if (keySet != null && !keySet.isEmpty()) {
+                    qryTopicKey.addAll(keySet);
                 }
             }
         }
+        if (qryTopicKey == null) {
+            qryTopicKey = new HashSet<>(topicConfCache.keySet());
+        }
+        if (qryTopicKey.isEmpty()) {
+            return retEntityMap;
+        }
+        for (String recordKey: qryTopicKey) {
+            TopicDeployEntity entity = topicConfCache.get(recordKey);
+            if (entity == null
+                    || (qryEntity != null && !qryEntity.isMatched(entity))) {
+                continue;
+            }
+            items = retEntityMap.get(entity.getTopicName());
+            if (items == null) {
+                items = new ArrayList<>();
+                retEntityMap.put(entity.getTopicName(), items);
+            }
+            items.add(entity);
+        }
+        return retEntityMap;
+    }
+
+    @Override
+    public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+        List<TopicDeployEntity> items;
+        Set<String> qryTopicKey = null;
+        ConcurrentHashSet<String> keySet;
+        Map<Integer, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
+        if (topicNameSet != null && !topicNameSet.isEmpty()) {
+            qryTopicKey = new HashSet<>();
+            for (String topicName : topicNameSet) {
+                keySet = topicNameCacheIndex.get(topicName);
+                if (keySet != null && !keySet.isEmpty()) {
+                    qryTopicKey.addAll(keySet);
+                }
+            }
+        }
+        if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+            if (qryTopicKey == null) {
+                qryTopicKey = new HashSet<>();
+            }
+            for (Integer brokerId : brokerIdSet) {
+                keySet = brokerIdCacheIndex.get(brokerId);
+                if (keySet != null && !keySet.isEmpty()) {
+                    qryTopicKey.addAll(keySet);
+                }
+            }
+        }
+        if (qryTopicKey == null) {
+            qryTopicKey = new HashSet<>(topicConfCache.keySet());
+        }
+        if (qryTopicKey.isEmpty()) {
+            return retEntityMap;
+        }
+        for (String recordKey: qryTopicKey) {
+            TopicDeployEntity entity = topicConfCache.get(recordKey);
+            if (entity == null) {
+                continue;
+            }
+            items = retEntityMap.get(entity.getBrokerId());
+            if (items == null) {
+                items = new ArrayList<>();
+                retEntityMap.put(entity.getBrokerId(), items);
+            }
+            items.add(entity);
+        }
         return retEntityMap;
     }
 
     @Override
-    public Map<String, List<TopicDeployConfEntity>> getTopicConfMapByTopicAndBrokerIds(
+    public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
             Set<String> topicSet, Set<Integer> brokerIdSet) {
-        TopicDeployConfEntity tmpEntity;
-        List<TopicDeployConfEntity> itemLst;
+        TopicDeployEntity tmpEntity;
+        List<TopicDeployEntity> itemLst;
         ConcurrentHashSet<String> recSet;
         Set<String> hitKeys = new HashSet<>();
-        Map<String, List<TopicDeployConfEntity>> retEntityMap = new HashMap<>();
+        Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
         if (((topicSet == null) || (topicSet.isEmpty()))
                 && ((brokerIdSet == null) || (brokerIdSet.isEmpty()))) {
-            for (TopicDeployConfEntity entity : topicConfCache.values()) {
+            for (TopicDeployEntity entity : topicConfCache.values()) {
                 itemLst = retEntityMap.get(entity.getTopicName());
                 if (itemLst == null) {
                     itemLst = new ArrayList<>();
@@ -344,7 +422,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
                 continue;
             }
             for (String key : keySet) {
-                TopicDeployConfEntity entity = topicConfCache.get(key);
+                TopicDeployEntity entity = topicConfCache.get(key);
                 if (entity == null) {
                     continue;
                 }
@@ -367,9 +445,9 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
     }
 
     @Override
-    public Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId) {
-        TopicDeployConfEntity tmpEntity;
-        Map<String, TopicDeployConfEntity> retEntityMap = new HashMap<>();
+    public Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId) {
+        TopicDeployEntity tmpEntity;
+        Map<String, TopicDeployEntity> retEntityMap = new HashMap<>();
         ConcurrentHashSet<String> records = brokerIdCacheIndex.get(brokerId);
         if (records == null || records.isEmpty()) {
             return retEntityMap;
@@ -391,7 +469,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
      * @param result process result with old value
      * @return
      */
-    private boolean putTopicConfig2Bdb(TopicDeployConfEntity memEntity, ProcessResult result) {
+    private boolean putTopicConfig2Bdb(TopicDeployEntity memEntity, ProcessResult result) {
         BdbTopicConfEntity retData = null;
         BdbTopicConfEntity bdbEntity =
                 memEntity.buildBdbTopicConfEntity();
@@ -420,7 +498,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
     }
 
     private void delCacheRecord(String recordKey) {
-        TopicDeployConfEntity curEntity =
+        TopicDeployEntity curEntity =
                 topicConfCache.remove(recordKey);
         if (curEntity == null) {
             return;
@@ -452,7 +530,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
         }
     }
 
-    private void addOrUpdCacheRecord(TopicDeployConfEntity entity) {
+    private void addOrUpdCacheRecord(TopicDeployEntity entity) {
         topicConfCache.put(entity.getRecordKey(), entity);
         // add topic index map
         ConcurrentHashSet<String> keySet =
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/TopicProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/TopicProcessResult.java
new file mode 100644
index 0000000..a8bbd62
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/TopicProcessResult.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+
+
+public class TopicProcessResult extends ProcessResult {
+    private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
+    private String topicName = "";
+
+    public TopicProcessResult() {
+
+    }
+
+    public TopicProcessResult(int brokerId,
+                              String topicName,
+                              ProcessResult result) {
+        super(result);
+        this.brokerId = brokerId;
+        this.topicName = topicName;
+    }
+
+    public int getBrokerId() {
+        return brokerId;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
new file mode 100644
index 0000000..fe1501a
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -0,0 +1,520 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.policies.FlowCtrlItem;
+import org.apache.tubemq.corebase.policies.FlowCtrlRuleHandler;
+import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+
+
+
+public class WebAdminFlowRuleHandler extends AbstractWebHandler {
+
+    private static final String blankFlowCtrlRules = "[]";
+    private static final List<Integer> allowedPriorityVal = Arrays.asList(1, 2, 3);
+    private static final Set<String> rsvGroupNameSet =
+            new HashSet<>(Arrays.asList(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL));
+
+
+    public WebAdminFlowRuleHandler(TMaster master) {
+        super(master);
+    }
+
+    @Override
+    public void registerWebApiMethod() {
+        // register query method
+        registerQueryWebMethod("admin_query_def_flow_control_rule",
+                "adminQueryDefGroupFlowCtrlRule");
+        registerQueryWebMethod("admin_query_group_flow_control_rule",
+                "adminQuerySpecGroupFlowCtrlRule");
+        // register modify method
+        registerModifyWebMethod("admin_set_def_flow_control_rule",
+                "adminSetDefGroupFlowCtrlRule");
+        registerModifyWebMethod("admin_set_group_flow_control_rule",
+                "adminSetSpecGroupFlowCtrlRule");
+        registerModifyWebMethod("admin_rmv_def_flow_control_rule",
+                "adminDelDefGroupFlowCtrlRuleStatus");
+        registerModifyWebMethod("admin_rmv_group_flow_control_rule",
+                "adminDelSpecGroupFlowCtrlRuleStatus");
+        registerModifyWebMethod("admin_upd_def_flow_control_rule",
+                "adminModDefGroupFlowCtrlRuleStatus");
+        registerModifyWebMethod("admin_upd_group_flow_control_rule",
+                "adminModSpecGroupFlowCtrlRuleStatus");
+    }
+
+    public StringBuilder adminQueryDefGroupFlowCtrlRule(HttpServletRequest req) {
+        return innQueryGroupFlowCtrlRule(req, true);
+    }
+
+    public StringBuilder adminQuerySpecGroupFlowCtrlRule(HttpServletRequest req) {
+        return innQueryGroupFlowCtrlRule(req, false);
+    }
+
+    public StringBuilder adminSetDefGroupFlowCtrlRule(HttpServletRequest req) {
+        return innSetFlowControlRule(req, true);
+    }
+
+    public StringBuilder adminSetSpecGroupFlowCtrlRule(HttpServletRequest req) {
+        return innSetFlowControlRule(req, false);
+    }
+
+    public StringBuilder adminDelDefGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+        return innDelGroupFlowCtrlRuleStatus(req, true);
+    }
+
+    public StringBuilder adminDelSpecGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+        return innDelGroupFlowCtrlRuleStatus(req, false);
+    }
+
+    public StringBuilder adminModDefGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+        return innModGroupFlowCtrlRuleStatus(req, true);
+    }
+
+    public StringBuilder adminModSpecGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+        return innModGroupFlowCtrlRuleStatus(req, false);
+    }
+
+    /**
+     * add flow control rule
+     *
+     * @param req
+     * @param do4DefFlowCtrl
+     * @return
+     */
+    private StringBuilder innSetFlowControlRule(HttpServletRequest req,
+                                                boolean do4DefFlowCtrl) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        // get createUser info
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.CREATEUSER, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        String createUser = (String) result.retData1;
+        // check and get create date
+        if (!WebParameterUtils.getDateParameter(req,
+                WebFieldDef.CREATEDATE, false, new Date(), result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Date createDate = (Date) result.retData1;
+        // get rule required status info
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.STATUSID, false, 0, 0, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        int statusId = (int) result.retData1;
+        // get and valid priority info
+        if (!getQryPriorityIdWithCheck(req, false, 301, 101, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        int qryPriorityId = (int) result.retData1;
+        // get group name info
+        if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> batchGroupNames = (Set<String>) result.retData1;
+        // get and flow control rule info
+        int ruleCnt = getAndCheckFlowRules(req, blankFlowCtrlRules, result);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        String flowCtrlInfo = (String) result.retData1;
+        try {
+            // add flow control to bdb
+            for (String groupName : batchGroupNames) {
+                if (groupName.equals(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
+                    brokerConfManager.confAddBdbGroupFlowCtrl(
+                            new BdbGroupFlowCtrlEntity(flowCtrlInfo,
+                                    statusId, ruleCnt, qryPriorityId, "",
+                                    false, createUser, createDate));
+                } else {
+                    brokerConfManager.confAddBdbGroupFlowCtrl(
+                            new BdbGroupFlowCtrlEntity(groupName,
+                                    flowCtrlInfo, statusId, ruleCnt, qryPriorityId, "",
+                                    false, createUser, createDate));
+                }
+            }
+            WebParameterUtils.buildSuccessResult(sBuilder);
+        } catch (Exception e) {
+            WebParameterUtils.buildFailResult(sBuilder, e.getMessage());
+        }
+        return sBuilder;
+    }
+
+    /**
+     * delete flow control rule
+     *
+     * @param req
+     * @param do4DefFlowCtrl
+     * @return
+     */
+    private StringBuilder innDelGroupFlowCtrlRuleStatus(HttpServletRequest req,
+                                                        boolean do4DefFlowCtrl) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        // get modifyUser info
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.CREATEUSER, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        String modifyUser = (String) result.retData1;
+        // check and get modifyDate date
+        if (!WebParameterUtils.getDateParameter(req,
+                WebFieldDef.CREATEDATE, false, new Date(), result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Date modifyDate = (Date) result.retData1;
+        // get group name info
+        if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> batchGroupNames = (Set<String>) result.retData1;
+        try {
+            brokerConfManager.confDeleteBdbGroupFlowCtrl(batchGroupNames);
+            WebParameterUtils.buildSuccessResult(sBuilder);
+        } catch (Exception e) {
+            WebParameterUtils.buildFailResult(sBuilder, e.getMessage());
+        }
+        return sBuilder;
+    }
+
+    /**
+     * modify flow control rule
+     *
+     * @param req
+     * @param do4DefFlowCtrl
+     * @return
+     */
+    private StringBuilder innModGroupFlowCtrlRuleStatus(HttpServletRequest req,
+                                                        boolean do4DefFlowCtrl) {
+        // #lizard forgives
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        // get modifyUser info
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.CREATEUSER, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        String modifyUser = (String) result.retData1;
+        // check and get modifyDate date
+        if (!WebParameterUtils.getDateParameter(req,
+                WebFieldDef.CREATEDATE, false, new Date(), result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Date modifyDate = (Date) result.retData1;
+        // get group name info
+        if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> batchGroupNames = (Set<String>) result.retData1;
+        // get rule required status info
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.STATUSID, false,
+                TBaseConstants.META_VALUE_UNDEFINED, 0, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        int statusId = (int) result.retData1;
+        // get and flow control rule info
+        int ruleCnt = getAndCheckFlowRules(req, null, result);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        String newFlowCtrlInfo = (String) result.retData1;
+        // get and valid priority info
+        if (!getQryPriorityIdWithCheck(req, false,
+                TBaseConstants.META_VALUE_UNDEFINED, 101, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        int qryPriorityId = (int) result.retData1;
+        try {
+            boolean foundChange;
+            for (String groupName : batchGroupNames) {
+                // check if record changed
+                BdbGroupFlowCtrlEntity oldEntity =
+                        brokerConfManager.getBdbGroupFlowCtrl(groupName);
+                if (oldEntity != null) {
+                    foundChange = false;
+                    BdbGroupFlowCtrlEntity newGroupFlowCtrlEntity =
+                            new BdbGroupFlowCtrlEntity(oldEntity.getGroupName(),
+                                    oldEntity.getFlowCtrlInfo(), oldEntity.getStatusId(),
+                                    oldEntity.getRuleCnt(), oldEntity.getAttributes(),
+                                    oldEntity.getSsdTranslateId(), oldEntity.isNeedSSDProc(),
+                                    oldEntity.getCreateUser(), oldEntity.getCreateDate());
+                    if (statusId != TBaseConstants.META_VALUE_UNDEFINED
+                            && statusId != oldEntity.getStatusId()) {
+                        foundChange = true;
+                        newGroupFlowCtrlEntity.setStatusId(statusId);
+                    }
+                    if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+                            && qryPriorityId != oldEntity.getQryPriorityId()) {
+                        foundChange = true;
+                        newGroupFlowCtrlEntity.setQryPriorityId(qryPriorityId);
+                    }
+                    if (TStringUtils.isNotBlank(newFlowCtrlInfo)
+                            && !newFlowCtrlInfo.equals(oldEntity.getFlowCtrlInfo())) {
+                        foundChange = true;
+                        newGroupFlowCtrlEntity.setFlowCtrlInfo(ruleCnt, newFlowCtrlInfo);
+                    }
+                    // update record if found change
+                    if (foundChange) {
+                        try {
+                            newGroupFlowCtrlEntity.setModifyInfo(modifyUser, modifyDate);
+                            brokerConfManager.confUpdateBdbGroupFlowCtrl(newGroupFlowCtrlEntity);
+                        } catch (Throwable ee) {
+                            //
+                        }
+                    }
+                }
+            }
+            WebParameterUtils.buildSuccessResult(sBuilder);
+        } catch (Exception e) {
+            WebParameterUtils.buildFailResult(sBuilder, e.getMessage());
+        }
+        return sBuilder;
+    }
+
+    /**
+     * query flow control rule
+     *
+     * @param req
+     * @param do4DefFlowCtrl
+     * @return
+     */
+    private StringBuilder innQueryGroupFlowCtrlRule(HttpServletRequest req,
+                                                    boolean do4DefFlowCtrl) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity = new BdbGroupFlowCtrlEntity();
+        // get modifyUser info
+        WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.CREATEUSER, false, null, result);
+        bdbGroupFlowCtrlEntity.setCreateUser((String) result.retData1);
+        // get status id info
+        WebParameterUtils.getIntParamValue(req, WebFieldDef.STATUSID, false,
+                TBaseConstants.META_VALUE_UNDEFINED, 0, result);
+        bdbGroupFlowCtrlEntity.setStatusId((int) result.retData1);
+        // get and valid priority info
+        getQryPriorityIdWithCheck(req, false,
+                TBaseConstants.META_VALUE_UNDEFINED, 101, result);
+        bdbGroupFlowCtrlEntity.setQryPriorityId((int) result.retData1);
+        getGroupNameWithCheck(req, false, do4DefFlowCtrl, result);
+        Set<String> batchGroupNames = (Set<String>) result.retData1;
+        // query group flow ctrl infos
+        List<BdbGroupFlowCtrlEntity> webGroupFlowCtrlEntities =
+                brokerConfManager.confGetBdbGroupFlowCtrl(bdbGroupFlowCtrlEntity);
+        int totalCnt = 0;
+        boolean found = false;
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+        if (do4DefFlowCtrl) {
+            for (BdbGroupFlowCtrlEntity entity : webGroupFlowCtrlEntities) {
+                if (entity.getGroupName().equals(
+                        TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
+                    if (totalCnt++ > 0) {
+                        sBuilder.append(",");
+                    }
+                    sBuilder = entity.toJsonString(sBuilder);
+                    break;
+                }
+            }
+        } else {
+            for (BdbGroupFlowCtrlEntity entity : webGroupFlowCtrlEntities) {
+                if (entity.getGroupName().equals(
+                        TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
+                    continue;
+                }
+                found = false;
+                for (String tmpGroupName : batchGroupNames) {
+                    if (entity.getGroupName().equals(tmpGroupName)) {
+                        found = true;
+                        break;
+                    }
+                }
+                if (found) {
+                    if (totalCnt++ > 0) {
+                        sBuilder.append(",");
+                    }
+                    sBuilder = entity.toJsonString(sBuilder);
+                }
+            }
+        }
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+        return sBuilder;
+    }
+
+    // translate rule info to json format string
+    private int getAndCheckFlowRules(HttpServletRequest req,
+                                     String defValue,
+                                     ProcessResult result) {
+        int ruleCnt = 0;
+        StringBuilder strBuffer = new StringBuilder(512);
+        // get parameter value
+        String paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.name);
+        if (paramValue == null) {
+            paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.shortName);
+        }
+        if (TStringUtils.isBlank(paramValue)) {
+            result.setSuccResult(defValue);
+            return ruleCnt;
+        }
+        strBuffer.append("[");
+        paramValue = paramValue.trim();
+        List<Integer> ruleTypes = Arrays.asList(0, 1, 2, 3);
+        FlowCtrlRuleHandler flowCtrlRuleHandler =
+                new FlowCtrlRuleHandler(true);
+        Map<Integer, List<FlowCtrlItem>> flowCtrlItemMap;
+        try {
+            flowCtrlItemMap =
+                    flowCtrlRuleHandler.parseFlowCtrlInfo(paramValue);
+        } catch (Throwable e) {
+            result.setFailResult(new StringBuilder(512)
+                    .append("Parse parameter ").append(WebFieldDef.FLOWCTRLSET.name)
+                    .append(" failure: '").append(e.toString()).toString());
+            return 0;
+        }
+        for (Integer typeId : ruleTypes) {
+            if (typeId != null) {
+                int rules = 0;
+                List<FlowCtrlItem> flowCtrlItems = flowCtrlItemMap.get(typeId);
+                if (flowCtrlItems != null) {
+                    if (ruleCnt++ > 0) {
+                        strBuffer.append(",");
+                    }
+                    strBuffer.append("{\"type\":").append(typeId.intValue()).append(",\"rule\":[");
+                    for (FlowCtrlItem flowCtrlItem : flowCtrlItems) {
+                        if (flowCtrlItem != null) {
+                            if (rules++ > 0) {
+                                strBuffer.append(",");
+                            }
+                            strBuffer = flowCtrlItem.toJsonString(strBuffer);
+                        }
+                    }
+                    strBuffer.append("]}");
+                }
+            }
+        }
+        strBuffer.append("]");
+        result.setSuccResult(strBuffer.toString());
+        return ruleCnt;
+    }
+
+    private boolean getGroupNameWithCheck(HttpServletRequest req, boolean required,
+                                          boolean do4DefFlowCtrl, ProcessResult result) {
+        if (do4DefFlowCtrl) {
+            result.setSuccResult(rsvGroupNameSet);
+            return true;
+        }
+        // get group list
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, required, null, result)) {
+            return result.success;
+        }
+        Set<String> inGroupSet = (Set<String>) result.retData1;
+        for (String rsvGroup : rsvGroupNameSet) {
+            if (inGroupSet.contains(rsvGroup)) {
+                result.setFailResult(new StringBuilder(512)
+                        .append("Illegal value in ").append(WebFieldDef.COMPSGROUPNAME.name)
+                        .append(" parameter: '").append(rsvGroup)
+                        .append("' is a system reserved value!").toString());
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean getQryPriorityIdWithCheck(HttpServletRequest req, boolean required,
+                                              int defValue, int minValue,
+                                              ProcessResult result) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.QRYPRIORITYID, required,
+                defValue, minValue, result)) {
+            return result.success;
+        }
+        int qryPriorityId = (int) result.retData1;
+        if (qryPriorityId > 303 || qryPriorityId < 101) {
+            result.setFailResult(new StringBuilder(512)
+                    .append("Illegal value in ").append(WebFieldDef.QRYPRIORITYID.name)
+                    .append(" parameter: ").append(WebFieldDef.QRYPRIORITYID.name)
+                    .append(" value must be greater than or equal")
+                    .append(" to 101 and less than or equal to 303!").toString());
+            return false;
+        }
+        if (!allowedPriorityVal.contains(qryPriorityId % 100)) {
+            result.setFailResult(new StringBuilder(512)
+                    .append("Illegal value in ").append(WebFieldDef.QRYPRIORITYID.name)
+                    .append(" parameter: the units of ").append(WebFieldDef.QRYPRIORITYID.name)
+                    .append(" must in ").append(allowedPriorityVal).toString());
+            return false;
+        }
+        if (!allowedPriorityVal.contains(qryPriorityId / 100)) {
+            result.setFailResult(new StringBuilder(512)
+                    .append("Illegal value in ").append(WebFieldDef.QRYPRIORITYID.name)
+                    .append(" parameter: the hundreds of ").append(WebFieldDef.QRYPRIORITYID.name)
+                    .append(" must in ").append(allowedPriorityVal).toString());
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index f11e3b9..4779674 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -39,7 +39,7 @@ import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -207,7 +207,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
         int totalCnt = 0;
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
         for (BrokerConfEntity entity : qryResult.values()) {
-            Map<String, TopicDeployConfEntity> topicConfEntityMap =
+            Map<String, TopicDeployEntity> topicConfEntityMap =
                     metaDataManager.getBrokerTopicConfEntitySet(entity.getBrokerId());
             if (!isValidRecord(topicNameSet, isInclude, topicStatus, topicConfEntityMap)) {
                 continue;
@@ -518,7 +518,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
         // check broker configure status
         List<BrokerProcessResult> retInfo = new ArrayList<>();
         Map<Integer, BrokerConfEntity> needDelMap = new HashMap<>();
-        Map<String, TopicDeployConfEntity> topicConfigMap;
+        Map<String, TopicDeployEntity> topicConfigMap;
         for (BrokerConfEntity entity : qryResult.values()) {
             if (entity == null) {
                 continue;
@@ -539,7 +539,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
             }
             boolean isMatched = true;
             if (isReservedData) {
-                for (Map.Entry<String, TopicDeployConfEntity> entry : topicConfigMap.entrySet()) {
+                for (Map.Entry<String, TopicDeployEntity> entry : topicConfigMap.entrySet()) {
                     if (entry.getValue() == null) {
                         continue;
                     }
@@ -579,7 +579,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
                 continue;
             }
             if (isReservedData) {
-                Map<String, TopicDeployConfEntity> brokerTopicConfMap =
+                Map<String, TopicDeployEntity> brokerTopicConfMap =
                         metaDataManager.getBrokerTopicConfEntitySet(entry.getBrokerId());
                 if (brokerTopicConfMap != null) {
                     metaDataManager.delBrokerTopicConfig(opTupleInfo.getF1(),
@@ -606,7 +606,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
      */
     private boolean isValidRecord(Set<String> qryTopicSet, Boolean isInclude,
                                   TopicStatus topicStatus,
-                                  Map<String, TopicDeployConfEntity> topicConfEntityMap) {
+                                  Map<String, TopicDeployEntity> topicConfEntityMap) {
         if ((topicConfEntityMap == null) || (topicConfEntityMap.isEmpty())) {
             if ((qryTopicSet.isEmpty() || !isInclude)
                     && topicStatus == TopicStatus.STATUS_TOPIC_UNDEFINED) {
@@ -639,7 +639,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
             }
         }
         // second check topic status if match
-        for (TopicDeployConfEntity topicConfEntity : topicConfEntityMap.values()) {
+        for (TopicDeployEntity topicConfEntity : topicConfEntityMap.values()) {
             if (topicConfEntity.getDeployStatus() == topicStatus) {
                 return true;
             }
@@ -656,12 +656,12 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
      * @return
      */
     private StringBuilder addTopicInfo(Boolean withTopic, StringBuilder sBuilder,
-                                       Map<String, TopicDeployConfEntity> topicConfEntityMap) {
+                                       Map<String, TopicDeployEntity> topicConfEntityMap) {
         if (withTopic) {
             sBuilder.append(",\"topicSet\":[");
             int topicCount = 0;
             if (topicConfEntityMap != null) {
-                for (TopicDeployConfEntity topicEntity : topicConfEntityMap.values()) {
+                for (TopicDeployEntity topicEntity : topicConfEntityMap.values()) {
                     if (topicCount++ > 0) {
                         sBuilder.append(",");
                     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index edeb31a..532a05f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -37,7 +37,7 @@ import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
 import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
@@ -303,7 +303,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
         }
         Set<String> topicNameSet = (Set<String>) result.retData1;
         // query topic configure info
-        Map<String, List<TopicDeployConfEntity>> topicConfMap =
+        Map<String, List<TopicDeployEntity>> topicConfMap =
                 metaDataManager.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIds);
         TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
         int totalCount = 0;
@@ -316,7 +316,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
         boolean isAcceptSubscribe = false;
         boolean enableAuthControl = false;
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
-        for (Map.Entry<String, List<TopicDeployConfEntity>> entry : topicConfMap.entrySet()) {
+        for (Map.Entry<String, List<TopicDeployEntity>> entry : topicConfMap.entrySet()) {
             if (totalCount++ > 0) {
                 sBuilder.append(",");
             }
@@ -328,7 +328,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
             enableAuthControl = false;
             isAcceptPublish = false;
             isAcceptSubscribe = false;
-            for (TopicDeployConfEntity entity : entry.getValue()) {
+            for (TopicDeployEntity entity : entry.getValue()) {
                 brokerCount++;
                 TopicPropGroup topicProps = entity.getTopicProps();
                 totalCfgNumPartCount +=
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java
new file mode 100644
index 0000000..a71fb12
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java
@@ -0,0 +1,862 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class WebTopicConfHandler extends AbstractWebHandler {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(WebTopicConfHandler.class);
+
+    /**
+     * Constructor
+     *
+     * @param master tube master
+     */
+    public WebTopicConfHandler(TMaster master) {
+        super(master);
+    }
+
+
+
+    @Override
+    public void registerWebApiMethod() {
+        // register query method
+        registerQueryWebMethod("admin_query_topic_info",
+                "adminQueryTopicCfgEntityAndRunInfo");
+        registerQueryWebMethod("admin_query_broker_topic_config_info",
+                "adminQueryBrokerTopicCfgAndRunInfo");
+        registerQueryWebMethod("admin_query_topicName",
+                "adminQuerySimpleTopicName");
+        registerQueryWebMethod("admin_query_brokerId",
+                "adminQuerySimpleBrokerId");
+        // register modify method
+        registerModifyWebMethod("admin_add_new_topic_record",
+                "adminAddTopicEntityInfo");
+        registerModifyWebMethod("admin_bath_add_new_topic_record",
+                "adminBatchAddTopicEntityInfo");
+        registerModifyWebMethod("admin_modify_topic_info",
+                "adminModifyTopicEntityInfo");
+        registerModifyWebMethod("admin_delete_topic_info",
+                "adminDeleteTopicEntityInfo");
+        registerModifyWebMethod("admin_redo_deleted_topic_info",
+                "adminRedoDeleteTopicEntityInfo");
+        registerModifyWebMethod("admin_remove_topic_info",
+                "adminRemoveTopicEntityInfo");
+    }
+
+    /**
+     * Query topic info
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminQueryTopicCfgEntityAndRunInfo(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuffer = new StringBuilder(512);
+        TopicDeployEntity qryEntity = new TopicDeployEntity();
+        // get queried operation info, for createUser, modifyUser, dataVersionId
+        if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        // check and get topicName field
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // check and get brokerId field
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+        // get brokerPort field
+        if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+                false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        int brokerPort = (int) result.getRetData();
+        // get and valid topicProps info
+        if (!WebParameterUtils.getTopicPropInfo(req,
+                null, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
+        // get and valid TopicStatusId info
+        if (!WebParameterUtils.getTopicStatusParamValue(req,
+                false, TopicStatus.STATUS_TOPIC_UNDEFINED, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        TopicStatus topicStatus = (TopicStatus) result.getRetData();
+        qryEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                TBaseConstants.META_VALUE_UNDEFINED, null, topicStatus, topicProps);
+        Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
+                metaDataManager.getTopicConfEntityMap(topicNameSet, brokerIdSet, qryEntity);
+        // build query result
+        return buildQueryResult(sBuffer, true, topicDeployInfoMap);
+    }
+
+    private StringBuilder buildQueryResult(StringBuilder sBuffer,
+                                           boolean withAuthInfo,
+                                           Map<String, List<TopicDeployEntity>> topicDeployInfoMap) {
+        // build query result
+        int totalCnt = 0;
+        int totalCfgNumPartCount = 0;
+        int totalRunNumPartCount = 0;
+        boolean isSrvAcceptPublish = false;
+        boolean isSrvAcceptSubscribe = false;
+        boolean isAcceptPublish = false;
+        boolean isAcceptSubscribe = false;
+        ManageStatus manageStatus;
+        Tuple2<Boolean, Boolean> pubSubStatus;
+        TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+        for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
+            totalCfgNumPartCount = 0;
+            totalRunNumPartCount = 0;
+            isSrvAcceptPublish = false;
+            isSrvAcceptSubscribe = false;
+            isAcceptPublish = false;
+            isAcceptSubscribe = false;
+            TopicCtrlEntity ctrlEntity =
+                    metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+            ctrlEntity.toWebJsonStr(sBuffer, true, false);
+            sBuffer.append(",\"deployInfo\":[");
+            int brokerCount = 0;
+            for (TopicDeployEntity entity : entry.getValue()) {
+                if (brokerCount++ > 0) {
+                    sBuffer.append(",");
+                }
+                totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
+                entity.toWebJsonStr(sBuffer, true, false);
+                sBuffer.append("\",\"runInfo\":{");
+                BrokerConfEntity brokerConfEntity =
+                        metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
+                String strManageStatus = "-";
+                if (brokerConfEntity != null) {
+                    manageStatus = brokerConfEntity.getManageStatus();
+                    strManageStatus = manageStatus.getDescription();
+                    pubSubStatus = manageStatus.getPubSubStatus();
+                    isAcceptPublish = pubSubStatus.getF0();
+                    isAcceptSubscribe = pubSubStatus.getF1();
+                }
+                BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
+                        entity.getBrokerIp(), entity.getBrokerPort());
+                TopicInfo topicInfo =
+                        topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+                if (topicInfo == null) {
+                    sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
+                            .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
+                } else {
+                    if (isAcceptPublish) {
+                        sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
+                        if (topicInfo.isAcceptPublish()) {
+                            isSrvAcceptPublish = true;
+                        }
+                    } else {
+                        sBuffer.append("\"acceptPublish\":false");
+                    }
+                    if (isAcceptSubscribe) {
+                        sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
+                        if (topicInfo.isAcceptSubscribe()) {
+                            isSrvAcceptSubscribe = true;
+                        }
+                    } else {
+                        sBuffer.append(",\"acceptSubscribe\":false");
+                    }
+                    totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+                    sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
+                            .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+                            .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
+                }
+                sBuffer.append("}}");
+            }
+            sBuffer.append("],\"infoCount\":").append(brokerCount)
+                    .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
+                    .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
+                    .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
+                    .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount);
+            if (withAuthInfo) {
+                sBuffer.append(",\"authConsumeGroup\":[");
+                List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
+                        metaDataManager.getConsumeCtrlByTopic(entry.getKey());
+                int countJ = 0;
+                for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
+                    if (countJ++ > 0) {
+                        sBuffer.append(",");
+                    }
+                    groupEntity.toWebJsonStr(sBuffer, true, false);
+                }
+                sBuffer.append("],\"groupCount\":").append(countJ);
+            }
+            sBuffer.append("}");
+        }
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+        return sBuffer;
+    }
+
+    /**
+     * Query broker topic config info
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminQueryBrokerTopicCfgAndRunInfo(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuffer = new StringBuilder(512);
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // check and get brokerId field
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+        Map<Integer, List<TopicDeployEntity>> queryResult =
+                metaDataManager.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+        // build query result
+        int dataCount = 0;
+        int totalStoreNum = 0;
+        int totalNumPartCount = 0;
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+        for (Map.Entry<Integer, List<TopicDeployEntity>> entry : queryResult.entrySet()) {
+            if (entry.getKey() == null || entry.getValue() == null) {
+                continue;
+            }
+            BrokerConfEntity brokerConf =
+                    metaDataManager.getBrokerConfByBrokerId(entry.getKey());
+            if (brokerConf == null) {
+                continue;
+            }
+            BrokerSyncStatusInfo brokerSyncInfo =
+                    metaDataManager.getBrokerRunSyncStatusInfo(entry.getKey());
+            List<TopicDeployEntity> topicDeployInfo = entry.getValue();
+            // build return info item
+            if (dataCount++ > 0) {
+                sBuffer.append(",");
+            }
+            totalStoreNum = 0;
+            totalNumPartCount = 0;
+            sBuffer.append("{\"brokerId\":").append(brokerConf.getBrokerId())
+                    .append(",\"brokerIp\":\"").append(brokerConf.getBrokerIp())
+                    .append("\",\"brokerPort\":").append(brokerConf.getBrokerPort())
+                    .append(",\"runInfo\":{");
+            String strManageStatus =
+                    brokerConf.getManageStatus().getDescription();
+            Tuple2<Boolean, Boolean> pubSubStatus =
+                    brokerConf.getManageStatus().getPubSubStatus();
+            if (brokerSyncInfo == null) {
+                sBuffer.append("\"acceptPublish\":\"-\"")
+                        .append(",\"acceptSubscribe\":\"-\"")
+                        .append(",\"totalPartitionNum\":\"-\"")
+                        .append(",\"totalTopicStoreNum\":\"-\"")
+                        .append(",\"brokerManageStatus\":\"-\"");
+            } else {
+                if (pubSubStatus.getF0()) {
+                    sBuffer.append("\"acceptPublish\":")
+                            .append(brokerSyncInfo.isAcceptPublish());
+                } else {
+                    sBuffer.append("\"acceptPublish\":false");
+                }
+                if (pubSubStatus.getF1()) {
+                    sBuffer.append(",\"acceptSubscribe\":")
+                            .append(brokerSyncInfo.isAcceptSubscribe());
+                } else {
+                    sBuffer.append(",\"acceptSubscribe\":false");
+                }
+                for (TopicDeployEntity topicEntity : topicDeployInfo) {
+                    if (topicEntity == null) {
+                        continue;
+                    }
+                    totalStoreNum +=
+                            topicEntity.getNumTopicStores();
+                    totalNumPartCount +=
+                            topicEntity.getNumTopicStores() * topicEntity.getNumPartitions();
+                }
+                sBuffer.append(",\"totalPartitionNum\":").append(totalNumPartCount)
+                        .append(",\"totalTopicStoreNum\":").append(totalStoreNum)
+                        .append(",\"brokerManageStatus\":\"")
+                        .append(strManageStatus).append("\"");
+            }
+            sBuffer.append("}}");
+        }
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, dataCount);
+        return sBuffer;
+    }
+
+    /**
+     * Query broker's topic-name set info
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+        Map<Integer, Set<String>> brokerTopicConfigMap =
+                metaDataManager.getBrokerTopicConfigInfo(brokerIds);
+        // build query result
+        int dataCount = 0;
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+        for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
+            if (dataCount++ > 0) {
+                sBuilder.append(",");
+            }
+            sBuilder.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
+            int topicCnt = 0;
+            Set<String> topicSet = entry.getValue();
+            for (String topic : topicSet) {
+                if (topicCnt++ > 0) {
+                    sBuilder.append(",");
+                }
+                sBuilder.append("\"").append(topic).append("\"");
+            }
+            sBuilder.append("],\"topicCount\":").append(topicCnt).append("}");
+        }
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, dataCount);
+        return sBuilder;
+    }
+
+    /**
+     * Query topic's broker id set
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.WITHIP, false, false, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        boolean withIp = (Boolean) result.retData1;
+        Map<String, Map<Integer, String>> topicBrokerConfigMap =
+                metaDataManager.getTopicBrokerConfigInfo(topicNameSet);
+        // build query result
+        int dataCount = 0;
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+        for (Map.Entry<String, Map<Integer, String>> entry : topicBrokerConfigMap.entrySet()) {
+            if (dataCount++ > 0) {
+                sBuilder.append(",");
+            }
+            sBuilder.append("{\"topicName\":\"").append(entry.getKey()).append("\",\"brokerInfo\":[");
+            int topicCnt = 0;
+            Map<Integer, String> brokerMap = entry.getValue();
+            if (withIp) {
+                for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+                    if (topicCnt++ > 0) {
+                        sBuilder.append(",");
+                    }
+                    sBuilder.append("{\"brokerId\":").append(entry1.getKey())
+                            .append(",\"brokerIp\":\"").append(entry1.getValue()).append("\"}");
+                }
+            } else {
+                for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+                    if (topicCnt++ > 0) {
+                        sBuilder.append(",");
+                    }
+                    sBuilder.append(entry1.getKey());
+                }
+            }
+            sBuilder.append("],\"brokerCnt\":").append(topicCnt).append("}");
+        }
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, dataCount);
+        return sBuilder;
+    }
+
+    /**
+     * Add new topic record
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminAddTopicEntityInfo(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        // check and get operation info
+        if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Tuple3<Long, String, Date> opTupleInfo =
+                (Tuple3<Long, String, Date>) result.getRetData();
+        // check and get topicName info
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // check and get brokerId info
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, true, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+        // check and get cluster default setting info
+        ClusterSettingEntity defClusterSetting =
+                metaDataManager.getClusterDefSetting();
+        if (defClusterSetting == null) {
+            if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
+                WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+                return sBuilder;
+            }
+            defClusterSetting = metaDataManager.getClusterDefSetting();
+        }
+        // get and valid TopicPropGroup info
+        if (!WebParameterUtils.getTopicPropInfo(req,
+                defClusterSetting.getClsDefTopicProps(), result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        TopicPropGroup topicPropInfo = (TopicPropGroup) result.getRetData();
+        /* check max message size
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.MAXMSGSIZEINMB, false,
+                TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
+                result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        int inMaxMsgSizeMB = (int) result.getRetData();
+         */
+        List<TopicProcessResult> retInfo =
+                metaDataManager.addTopicDeployInfo(opTupleInfo.getF0(),
+                        opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+                        topicNameSet, topicPropInfo, sBuilder, result);
+        return buildRetInfo(retInfo, sBuilder);
+    }
+
+    /**
+     * Add new topic record in batch
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminBatchAddTopicEntityInfo(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        // check and get operation info
+        if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Tuple3<Long, String, Date> opTupleInfo =
+                (Tuple3<Long, String, Date>) result.getRetData();
+        // check and get add record map
+        if (!getTopicDeployJsonSetInfo(req, true, true, opTupleInfo.getF0(),
+                opTupleInfo.getF1(), opTupleInfo.getF2(), null, sBuilder, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Map<String, TopicDeployEntity> addRecordMap =
+                (Map<String, TopicDeployEntity>) result.getRetData();
+        List<TopicProcessResult> retInfo = new ArrayList<>();
+        for (TopicDeployEntity topicDeployInfo : addRecordMap.values()) {
+            retInfo.add(metaDataManager.addTopicDeployInfo(topicDeployInfo, sBuilder, result));
+        }
+        return buildRetInfo(retInfo, sBuilder);
+    }
+
+    /**
+     * Modify topic info
+     *
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    // #lizard forgives
+    public StringBuilder adminModifyTopicEntityInfo(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        // check and get operation info
+        if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Tuple3<Long, String, Date> opTupleInfo =
+                (Tuple3<Long, String, Date>) result.getRetData();
+        // check and get topicName info
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // check and get brokerId info
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, true, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+        // get and valid TopicPropGroup info
+        if (!WebParameterUtils.getTopicPropInfo(req, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
+        // modify records
+        List<TopicProcessResult> retInfo =
+                metaDataManager.modTopicConfig(opTupleInfo.getF0(),
+                        opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+                        topicNameSet, topicProps, sBuilder, result);
+        return buildRetInfo(retInfo, sBuilder);
+    }
+
+    /**
+     * Delete topic info
+     *
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    public StringBuilder adminDeleteTopicEntityInfo(HttpServletRequest req) {
+        return innModifyTopicStatusEntityInfo(req, TopicStatus.STATUS_TOPIC_SOFT_DELETE);
+    }
+
+    /**
+     * Remove topic info
+     *
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    public StringBuilder adminRemoveTopicEntityInfo(HttpServletRequest req) {
+        return innModifyTopicStatusEntityInfo(req, TopicStatus.STATUS_TOPIC_SOFT_REMOVE);
+    }
+
+    /**
+     * Redo delete topic info
+     *
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    // #lizard forgives
+    public StringBuilder adminRedoDeleteTopicEntityInfo(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuffer = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        // check and get operation info
+        if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Tuple3<Long, String, Date> opTupleInfo =
+                (Tuple3<Long, String, Date>) result.getRetData();
+        // check and get topicName info
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // check and get brokerId info
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, true, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+        // modify records
+        List<TopicProcessResult> retInfo =
+                metaDataManager.modRedoDelTopicConf(opTupleInfo.getF0(),
+                        opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+                        topicNameSet, sBuffer, result);
+        return buildRetInfo(retInfo, sBuffer);
+    }
+
+
+
+    private boolean getTopicDeployJsonSetInfo(HttpServletRequest req, boolean required,
+                                              boolean isCreate, long dataVerId,
+                                              String operator, Date operateDate,
+                                              List<Map<String, String>> defValue,
+                                              StringBuilder sBuilder,
+                                              ProcessResult result) {
+        if (!WebParameterUtils.getJsonArrayParamValue(req,
+                WebFieldDef.TOPICJSONSET, required, defValue, result)) {
+            return result.success;
+        }
+        List<Map<String, String>> deployJsonArray =
+                (List<Map<String, String>>) result.retData1;
+        // check and get cluster default setting info
+        ClusterSettingEntity defClusterSetting =
+                metaDataManager.getClusterDefSetting();
+        if (defClusterSetting == null) {
+            if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
+                return result.isSuccess();
+            }
+            defClusterSetting = metaDataManager.getClusterDefSetting();
+        }
+        TopicDeployEntity itemConf;
+        Map<String, TopicDeployEntity> addRecordMap = new HashMap<>();
+        // check and get topic deployment configure
+        HashMap<String, TopicDeployEntity> addedRecordMap = new HashMap<>();
+        for (int j = 0; j < deployJsonArray.size(); j++) {
+            Map<String, String> confMap = deployJsonArray.get(j);
+            // check and get operation info
+            long itemDataVerId = dataVerId;
+            String itemCreator = operator;
+            Date itemCreateDate = operateDate;
+            if (!WebParameterUtils.getAUDBaseInfo(confMap, true, result)) {
+                return result.isSuccess();
+            }
+            Tuple3<Long, String, Date> opTupleInfo =
+                    (Tuple3<Long, String, Date>) result.getRetData();
+            if (opTupleInfo.getF0() != TBaseConstants.META_VALUE_UNDEFINED) {
+                itemDataVerId = opTupleInfo.getF0();
+            }
+            if (opTupleInfo.getF1() != null) {
+                itemCreator = opTupleInfo.getF1();
+            }
+            if (opTupleInfo.getF2() != null) {
+                itemCreateDate = opTupleInfo.getF2();
+            }
+            // get topicName configure info
+            if (!WebParameterUtils.getStringParamValue(confMap,
+                    WebFieldDef.TOPICNAME, true, "", result)) {
+                return result.success;
+            }
+            String topicName = (String) result.retData1;
+            // get broker configure info
+            if (!getBrokerConfInfo(confMap, sBuilder, result)) {
+                return result.isSuccess();
+            }
+            BrokerConfEntity brokerConf =
+                    (BrokerConfEntity) result.getRetData();
+            // get and valid TopicPropGroup info
+            if (!WebParameterUtils.getTopicPropInfo(confMap,
+                    defClusterSetting.getClsDefTopicProps(), result)) {
+                return result.isSuccess();
+            }
+            TopicPropGroup topicPropInfo = (TopicPropGroup) result.getRetData();
+            /* check max message size
+            if (!WebParameterUtils.getIntParamValue(confMap,
+                    WebFieldDef.MAXMSGSIZEINMB, false,
+                    TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                    TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                    TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
+                    result)) {
+                return result.isSuccess();
+            }
+            int inMaxMsgSizeMB = (int) result.getRetData();
+             */
+            // get topicNameId field
+            int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
+            TopicCtrlEntity topicCtrlEntity =
+                    metaDataManager.getTopicCtrlByTopicName(topicName);
+            if (topicCtrlEntity != null) {
+                topicNameId = topicCtrlEntity.getTopicId();
+            }
+            itemConf = new TopicDeployEntity(itemDataVerId, itemCreator, itemCreateDate);
+            itemConf.setTopicDeployInfo(brokerConf.getBrokerId(),
+                    brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
+            itemConf.updModifyInfo(topicNameId,
+                    TBaseConstants.META_VALUE_UNDEFINED, null,
+                    TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
+            addRecordMap.put(itemConf.getRecordKey(), itemConf);
+        }
+        // check result
+        if (addedRecordMap.isEmpty()) {
+            if (isCreate) {
+                result.setFailResult(sBuilder
+                        .append("Not found record in ")
+                        .append(WebFieldDef.TOPICJSONSET.name)
+                        .append(" parameter!").toString());
+                sBuilder.delete(0, sBuilder.length());
+                return result.isSuccess();
+            }
+        }
+        result.setSuccResult(addedRecordMap);
+        return result.isSuccess();
+    }
+
+    private boolean getBrokerConfInfo(Map<String, String> keyValueMap,
+                                      StringBuilder sBuilder, ProcessResult result) {
+        // get brokerId
+        if (!WebParameterUtils.getIntParamValue(keyValueMap,
+                WebFieldDef.BROKERID, true, 0, 0, result)) {
+            return result.success;
+        }
+        int brokerId = (int) result.getRetData();
+        BrokerConfEntity curEntity =
+                metaDataManager.getBrokerConfByBrokerId(brokerId);
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    sBuilder.append("Not found broker configure by ")
+                            .append(WebFieldDef.BROKERID.name).append(" = ").append(brokerId)
+                            .append(", please create the broker's configure first!").toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(curEntity);
+        return result.isSuccess();
+    }
+
+    private StringBuilder buildRetInfo(List<TopicProcessResult> retInfo,
+                                       StringBuilder sBuilder) {
+        int totalCnt = 0;
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+        for (TopicProcessResult entry : retInfo) {
+            if (totalCnt++ > 0) {
+                sBuilder.append(",");
+            }
+            sBuilder.append("{\"brokerId\":").append(entry.getBrokerId())
+                    .append("{\"topicName\":\"").append(entry.getTopicName()).append("\"")
+                    .append(",\"success\":").append(entry.isSuccess())
+                    .append(",\"errCode\":").append(entry.getErrCode())
+                    .append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
+        }
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+        return sBuilder;
+    }
+
+
+    /**
+     * Internal method to perform deletion and removal of topic
+     *
+     * @param req
+     * @param topicStatus
+     * @return
+     */
+    // #lizard forgives
+    private StringBuilder innModifyTopicStatusEntityInfo(HttpServletRequest req,
+                                                         TopicStatus topicStatus) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuffer = new StringBuilder(512);
+        // valid operation authorize info
+        if (!WebParameterUtils.validReqAuthorizeInfo(req,
+                WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        // check and get operation info
+        if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Tuple3<Long, String, Date> opTupleInfo =
+                (Tuple3<Long, String, Date>) result.getRetData();
+        // check and get topicName info
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // check and get brokerId info
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, true, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+            return sBuffer;
+        }
+        Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+        // modify records
+        List<TopicProcessResult> retInfo =
+                metaDataManager.modDelOrRmvTopicConf(opTupleInfo.getF0(),
+                        opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+                        topicNameSet, topicStatus, sBuffer, result);
+        return buildRetInfo(retInfo, sBuffer);
+    }
+
+}