You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/13 13:30:51 UTC

[incubator-inlong] branch master updated: [INLONG-3105][TubeMQ] Add MetaStoreMapper related implementation (#3106)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab01685  [INLONG-3105][TubeMQ] Add MetaStoreMapper related implementation (#3106)
ab01685 is described below

commit ab01685bbaa72d7ebbb3ca7e8439cb9b2c7efd42
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Mar 13 21:30:38 2022 +0800

    [INLONG-3105][TubeMQ] Add MetaStoreMapper related implementation (#3106)
---
 .../server/common/utils/WebParameterUtils.java     |   12 +-
 .../server/master/metamanage/MetaDataManager.java  |   12 +-
 .../metamanage/metastore/MetaStoreService.java     |   17 +-
 .../metastore/dao/mapper/BrokerConfigMapper.java   |    2 +-
 .../metastore/dao/mapper/ClusterConfigMapper.java  |   23 +-
 .../metastore/dao/mapper/ConsumeCtrlMapper.java    |    4 +-
 .../metastore/dao/mapper/MetaStoreMapper.java      |  570 +++++++++
 .../metastore/dao/mapper/TopicDeployMapper.java    |   27 +-
 .../metastore/impl/AbsBrokerConfigMapperImpl.java  |   52 +-
 .../metastore/impl/AbsClusterConfigMapperImpl.java |   79 +-
 .../metastore/impl/AbsConsumeCtrlMapperImpl.java   |   78 +-
 .../metastore/impl/AbsGroupResCtrlMapperImpl.java  |    4 +-
 .../metastore/impl/AbsMetaStoreMapperImpl.java     | 1214 ++++++++++++++++++++
 .../metastore/impl/AbsTopicCtrlMapperImpl.java     |    4 +-
 .../metastore/impl/AbsTopicDeployMapperImpl.java   |  193 ++--
 .../impl/bdbimpl/BdbMetaStoreServiceImpl.java      |  115 +-
 16 files changed, 2077 insertions(+), 329 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index 0bb3c71..4bd8b2c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -378,11 +378,11 @@ public class WebParameterUtils {
      * @param brokerWebPort  broker web port
      * @param strBuff        string buffer
      * @param result     check result of parameter value
-     * @return   true for illegal, false for legal
+     * @return   true for valid, false for invalid
      */
-    public static boolean isConflictedPortsSet(int brokerPort, int brokerTlsPort,
-                                               int brokerWebPort, StringBuilder strBuff,
-                                               ProcessResult result) {
+    public static boolean isValidPortsSet(int brokerPort, int brokerTlsPort,
+                                          int brokerWebPort, StringBuilder strBuff,
+                                          ProcessResult result) {
         if (brokerPort == brokerWebPort || brokerTlsPort == brokerWebPort) {
             result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
                     strBuff.append(DataOpErrCode.DERR_CONFLICT_VALUE.getDescription())
@@ -392,9 +392,9 @@ public class WebParameterUtils {
                             .append(" cannot be the same as the value of")
                             .append(WebFieldDef.BROKERWEBPORT.name).toString());
             strBuff.delete(0, strBuff.length());
-            return !result.isSuccess();
+            return result.isSuccess();
         }
-        return false;
+        return true;
     }
 
     /**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 6da5457..f35104e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -425,7 +425,7 @@ public class MetaDataManager implements Server {
         if (isAddOp) {
             if (metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId()) == null
                     && metaStoreService.getBrokerConfByBrokerIp(entity.getBrokerIp()) == null) {
-                if (!WebParameterUtils.isConflictedPortsSet(entity.getBrokerPort(),
+                if (WebParameterUtils.isValidPortsSet(entity.getBrokerPort(),
                         entity.getBrokerTLSPort(), entity.getBrokerWebPort(), sBuffer, result)) {
                     if (metaStoreService.addBrokerConf(entity, sBuffer, result)) {
                         this.tMaster.getBrokerRunManager().updBrokerStaticInfo(entity);
@@ -453,7 +453,7 @@ public class MetaDataManager implements Server {
                         entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
                         entity.getRegionId(), entity.getGroupId(),
                         entity.getManageStatus(), entity.getTopicProps())) {
-                    if (!WebParameterUtils.isConflictedPortsSet(newEntity.getBrokerPort(),
+                    if (WebParameterUtils.isValidPortsSet(newEntity.getBrokerPort(),
                             newEntity.getBrokerTLSPort(), newEntity.getBrokerWebPort(),
                             sBuffer, result)) {
                         if (metaStoreService.updBrokerConf(newEntity, sBuffer, result)) {
@@ -1580,9 +1580,9 @@ public class MetaDataManager implements Server {
             newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
                     brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
                     flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps);
-            if (!WebParameterUtils.isConflictedPortsSet(newConf.getBrokerPort(),
+            if (WebParameterUtils.isValidPortsSet(newConf.getBrokerPort(),
                     newConf.getBrokerTLSPort(), newConf.getBrokerWebPort(), sBuffer, result)) {
-                metaStoreService.addClusterConfig(newConf, sBuffer, result);
+                metaStoreService.addUpdClusterConfig(newConf, sBuffer, result);
             }
         } else {
             newConf = curConf.clone();
@@ -1590,10 +1590,10 @@ public class MetaDataManager implements Server {
             if (newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
                     brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
                     flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps)) {
-                if (!WebParameterUtils.isConflictedPortsSet(newConf.getBrokerPort(),
+                if (WebParameterUtils.isValidPortsSet(newConf.getBrokerPort(),
                         newConf.getBrokerTLSPort(), newConf.getBrokerWebPort(),
                         sBuffer, result)) {
-                    metaStoreService.updClusterConfig(newConf, sBuffer, result);
+                    metaStoreService.addUpdClusterConfig(newConf, sBuffer, result);
                 }
             } else {
                 result.setSuccResult(null);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 3daeb15..fb7821b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -38,26 +38,15 @@ public interface MetaStoreService extends KeepAlive, Server {
     // cluster default configure api
 
     /**
-     * Add or update cluster default setting
+     * Add cluster default setting, or update records if data exists
      *
      * @param entity     the cluster default setting entity will be add
      * @param strBuff  the print info string buffer
      * @param result     the process result return
      * @return true if success otherwise false
      */
-    boolean addClusterConfig(ClusterSettingEntity entity,
-                             StringBuilder strBuff, ProcessResult result);
-
-    /**
-     * Update cluster default setting
-     *
-     * @param entity     the cluster default setting entity will be add
-     * @param strBuff  the print info string buffer
-     * @param result     the process result return
-     * @return true if success otherwise false
-     */
-    boolean updClusterConfig(ClusterSettingEntity entity,
-                             StringBuilder strBuff, ProcessResult result);
+    boolean addUpdClusterConfig(ClusterSettingEntity entity,
+                                StringBuilder strBuff, ProcessResult result);
 
     ClusterSettingEntity getClusterConfig();
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
index 2a9ff53..dba9932 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
@@ -59,7 +59,7 @@ public interface BrokerConfigMapper extends AbstractMapper {
      * @return  the process result
      */
     boolean updBrokerMngStatus(BaseEntity opEntity,
-                               Integer brokerId, ManageStatus newMngStatus,
+                               int brokerId, ManageStatus newMngStatus,
                                StringBuilder strBuff, ProcessResult result);
 
     /**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java
index 559a523..e173565 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java
@@ -23,26 +23,24 @@ import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.Cl
 public interface ClusterConfigMapper extends AbstractMapper {
 
     /**
-     * Add a new cluster setting info into store
+     * Add or replace cluster setting info into store
      *
      * @param entity   need add record
      * @param strBuff  the string buffer
      * @param result   process result with old value
      * @return  the process result
      */
-    boolean addClusterConfig(ClusterSettingEntity entity,
-                             StringBuilder strBuff, ProcessResult result);
+    boolean addUpdClusterConfig(ClusterSettingEntity entity,
+                                StringBuilder strBuff, ProcessResult result);
 
     /**
-     * Update cluster setting info in store
+     * delete current cluster setting from store
      *
-     * @param entity   need add record
+     * @param result the process result
      * @param strBuff  the string buffer
-     * @param result   process result with old value
      * @return  the process result
      */
-    boolean updClusterConfig(ClusterSettingEntity entity,
-                             StringBuilder strBuff, ProcessResult result);
+    boolean delClusterConfig(StringBuilder strBuff, ProcessResult result);
 
     /**
      * get current cluster setting from store
@@ -50,13 +48,4 @@ public interface ClusterConfigMapper extends AbstractMapper {
      * @return current cluster setting, null or object, only read
      */
     ClusterSettingEntity getClusterConfig();
-
-    /**
-     * delete current cluster setting from store
-     *
-     * @param result the process result
-     * @param strBuff  the string buffer
-     * @return  the process result
-     */
-    boolean delClusterConfig(StringBuilder strBuff, ProcessResult result);
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
index 6aa6606..971e088 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
@@ -36,9 +36,9 @@ public interface ConsumeCtrlMapper extends AbstractMapper {
     boolean delGroupConsumeCtrlConf(String groupName, String topicName,
                                     StringBuilder strBuff, ProcessResult result);
 
-    boolean isTopicNameInUsed(String topicName);
+    boolean isTopicNameInUse(String topicName);
 
-    boolean hasGroupConsumeCtrlConf(String groupName);
+    boolean isGroupNameInUse(String groupName);
 
     GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey);
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaStoreMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaStoreMapper.java
new file mode 100644
index 0000000..1470b7f
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaStoreMapper.java
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+
+public interface MetaStoreMapper {
+
+    /**
+     * Add or update cluster default setting
+     * @param opEntity       operator information
+     * @param brokerPort     broker port
+     * @param brokerTlsPort  broker tls port
+     * @param brokerWebPort  broker web port
+     * @param maxMsgSizeMB   max cluster message size in MB
+     * @param qryPriorityId  the default query priority id
+     * @param flowCtrlEnable enable or disable flow control function
+     * @param flowRuleCnt    the default flow rule count
+     * @param flowCtrlInfo   the default flow control information
+     * @param topicProps     default topic property information
+     * @param strBuff        the print information string buffer
+     * @param result         the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdClusterDefSetting(BaseEntity opEntity, int brokerPort,
+                                      int brokerTlsPort, int brokerWebPort,
+                                      int maxMsgSizeMB, int qryPriorityId,
+                                      Boolean flowCtrlEnable, int flowRuleCnt,
+                                      String flowCtrlInfo, TopicPropGroup topicProps,
+                                      StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Get cluster configure information
+     *
+     * @param isMustConf  whether must be configured data.
+     * @return the cluster configure
+     */
+    ClusterSettingEntity getClusterDefSetting(boolean isMustConf);
+
+    // ////////////////////////////////////////////////////////////
+    /**
+     * Add or update broker configure information
+     *
+     * @param isAddOp        whether add operation
+     * @param opEntity       operator information
+     * @param brokerId       broker id
+     * @param brokerIp       broker ip
+     * @param brokerPort     broker port
+     * @param brokerTlsPort  broker tls port
+     * @param brokerWebPort  broker web port
+     * @param regionId       region id
+     * @param groupId        group id
+     * @param mngStatus      manage status
+     * @param topicProps     default topic property information
+     * @param strBuff        the print information string buffer
+     * @param result         the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdBrokerConfig(boolean isAddOp, BaseEntity opEntity,
+                                 int brokerId, String brokerIp, int brokerPort,
+                                 int brokerTlsPort, int brokerWebPort,
+                                 int regionId, int groupId, ManageStatus mngStatus,
+                                 TopicPropGroup topicProps, StringBuilder strBuff,
+                                 ProcessResult result);
+
+    /**
+     * Add or update broker configure information
+     *
+     * @param isAddOp   whether add operation
+     * @param entity    need add or update configure information
+     * @param strBuff   the print information string buffer
+     * @param result    the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdBrokerConfig(boolean isAddOp, BrokerConfEntity entity,
+                                 StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Change broker configure status
+     *
+     * @param opEntity      operator
+     * @param brokerId      need deleted broker id
+     * @param newMngStatus  manage status
+     * @param strBuff       the print information string buffer
+     * @param result        the process result return
+     * @return true if success otherwise false
+     */
+    boolean changeBrokerConfStatus(BaseEntity opEntity,
+                                   int brokerId, ManageStatus newMngStatus,
+                                   StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Delete broker configure information
+     *
+     * @param operator  operator
+     * @param brokerId  need deleted broker id
+     * @param rsvData   reserve broker data
+     * @param strBuff   the print information string buffer
+     * @param result     the process result return
+     * @return true if success otherwise false
+     */
+    boolean delBrokerConfInfo(String operator, int brokerId, boolean rsvData,
+                              StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Query broker configure information
+     *
+     * @param qryEntity  the query condition, must not null
+     * @return  the query result
+     */
+    Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity);
+
+    /**
+     * Get broker configure information
+     *
+     * @param brokerIdSet  the broker id set need to query
+     * @param brokerIpSet  the broker ip set need to query
+     * @param qryEntity    the query condition
+     * @return broker configure information
+     */
+    Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+                                                     Set<String> brokerIpSet,
+                                                     BrokerConfEntity qryEntity);
+
+    /**
+     * Get broker configure information
+     *
+     * @param brokerId  need queried broker id
+     * @return  the broker configure record
+     */
+    BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
+
+    /**
+     * Get broker configure information
+     *
+     * @param brokerIp  need queried broker ip
+     * @return  the broker configure record
+     */
+    BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp);
+
+    // ////////////////////////////////////////////////////////////
+
+    /**
+     * Add or Update topic control configure info
+     *
+     * @param isAddOp        whether add operation
+     * @param opEntity       operator information
+     * @param topicName      topic name
+     * @param topicNameId    the topic name id
+     * @param enableTopicAuth  whether enable topic authentication
+     * @param maxMsgSizeInMB   the max message size in MB
+     * @param strBuff          the print info string buffer
+     * @param result           the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+                                  String topicName, int topicNameId,
+                                  Boolean enableTopicAuth, int maxMsgSizeInMB,
+                                  StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Add or Update topic control configure info
+     *
+     * @param isAddOp  whether add operation
+     * @param entity   the topic control info entity will be add
+     * @param strBuff  the print info string buffer
+     * @param result   the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
+                                  StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Add topic control record, or update records if data exists
+     *
+     * @param opEntity operator information
+     * @param topicName topic info
+     * @param enableTopicAuth if authenticate check
+     * @param strBuff  the print info string buffer
+     * @param result     the process result return
+     * @return true if success otherwise false
+     */
+    boolean insertTopicCtrlConf(BaseEntity opEntity,
+                                String topicName, Boolean enableTopicAuth,
+                                StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Add topic control record, or update records if data exists
+     *
+     * @param entity  operator information
+     * @param strBuff  the print info string buffer
+     * @param result     the process result return
+     * @return true if success otherwise false
+     */
+    boolean insertTopicCtrlConf(TopicCtrlEntity entity,
+                                StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Delete topic control configure
+     *
+     * @param operator   operator
+     * @param topicName  the topicName will be deleted
+     * @param strBuff  the print info string buffer
+     * @param result     the process result return
+     * @return true if success otherwise false
+     */
+    boolean delTopicCtrlConf(String operator, String topicName,
+                             StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Get topic control record by topic name
+     *
+     * @param topicName  the topic name
+     * @return    the topic control record
+     */
+    TopicCtrlEntity getTopicCtrlByTopicName(String topicName);
+
+    /**
+     * Get topic max message size in MB configure
+     *
+     * @param topicName   the topic name
+     * @return   the max message size
+     */
+    int getTopicMaxMsgSizeInMB(String topicName);
+
+    /**
+     * Get topic control entity list
+     *
+     * @param qryEntity   the query condition
+     * @return   the query result list
+     */
+    List<TopicCtrlEntity> queryTopicCtrlConf(TopicCtrlEntity qryEntity);
+
+    /**
+     * Get topic control entity list
+     *
+     * @param topicNameSet  the topic name set
+     * @param qryEntity     the query condition
+     * @return   the query result list
+     */
+    Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+                                                  TopicCtrlEntity qryEntity);
+
+    // ////////////////////////////////////////////////////////////
+
+    /**
+     * Add or update topic deploy information
+     *
+     * @param isAddOp         whether add operation
+     * @param opEntity        the operation information
+     * @param brokerId        the broker id
+     * @param topicName       the topic name
+     * @param deployStatus    the deploy status
+     * @param topicPropInfo   the topic property set
+     * @param strBuff         the string buffer
+     * @param result          the process result
+     * @return                true if success otherwise false
+     */
+    boolean addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
+                                    int brokerId, String topicName,
+                                    TopicStatus deployStatus,
+                                    TopicPropGroup topicPropInfo,
+                                    StringBuilder strBuff,
+                                    ProcessResult result);
+
+    /**
+     * Add or update topic deploy configure info
+     *
+     * @param isAddOp  whether add operation
+     * @param deployEntity   the topic deploy info entity
+     * @param strBuff  the print info string buffer
+     * @param result   the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdTopicDeployInfo(boolean isAddOp, TopicDeployEntity deployEntity,
+                                    StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Update topic deploy status info
+     *
+     * @param opEntity       the operation information
+     * @param brokerId       the broker id
+     * @param topicName      the topic name
+     * @param topicStatus    the topic deploy status
+     * @param strBuff  the print info string buffer
+     * @param result   the process result return
+     * @return true if success otherwise false
+     */
+    boolean updTopicDeployStatusInfo(BaseEntity opEntity, int brokerId,
+                                     String topicName, TopicStatus topicStatus,
+                                     StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Get broker topic entity, if query entity is null, return all topic entity
+     *
+     * @param topicNameSet  query by topicNameSet
+     * @param brokerIdSet   query by brokerIdSet
+     * @param qryEntity     query conditions
+     * @return topic deploy entity map
+     */
+    Map<String, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                               Set<Integer> brokerIdSet,
+                                                               TopicDeployEntity qryEntity);
+
+    /**
+     * Get broker topic entity, if query entity is null, return all topic entity
+     *
+     * @param topicNameSet   the query topic set
+     * @param brokerIdSet    the query broker id set
+     * @return topic deploy entity map
+     */
+    Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                                Set<Integer> brokerIdSet);
+
+    /**
+     * Get broker topic entity by topic name and broker id set
+     *
+     * @param topicNameSet   the query topic set
+     * @param brokerIdSet    the query broker id set
+     * @return topic entity map
+     */
+    Map<String, List<TopicDeployEntity>> getTopicConfInfoByTopicAndBrokerIds(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet);
+
+    // ////////////////////////////////////////////////////////////
+
+    /**
+     * Add or update group resource configure information
+     *
+     * @param isAddOp        whether add operation
+     * @param opEntity       operator information
+     * @param groupName      the group name
+     * @param resCheckEnable  whether check resource rate
+     * @param allowedBClientRate  the allowed broker-client rate
+     * @param qryPriorityId  the query priority id
+     * @param flowCtrlEnable enable or disable flow control
+     * @param flowRuleCnt    the flow control rule count
+     * @param flowCtrlInfo   the flow control information
+     * @param strBuff        the print information string buffer
+     * @param result         the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdGroupResCtrlConf(boolean isAddOp, BaseEntity opEntity,
+                                     String groupName, Boolean resCheckEnable,
+                                     int allowedBClientRate, int qryPriorityId,
+                                     Boolean flowCtrlEnable, int flowRuleCnt,
+                                     String flowCtrlInfo, StringBuilder strBuff,
+                                     ProcessResult result);
+
+    /**
+     * Add group resource control configure info
+     *
+     * @param entity     the group resource control info entity will be add
+     * @param strBuff  the print info string buffer
+     * @param result     the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdGroupResCtrlConf(boolean isAddOp, GroupResCtrlEntity entity,
+                                     StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Add group control configure, or update records if data exists
+     *
+     * @param opEntity       operator information
+     * @param groupName      the group name
+     * @param qryPriorityId  the query priority id
+     * @param flowCtrlEnable enable or disable flow control
+     * @param flowRuleCnt    the flow control rule count
+     * @param flowCtrlRuleInfo   the flow control information
+     * @param strBuff        the print information string buffer
+     * @param result         the process result return
+     * @return true if success otherwise false
+     */
+    boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+                                int qryPriorityId, Boolean flowCtrlEnable,
+                                int flowRuleCnt, String flowCtrlRuleInfo,
+                                StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Add group control configure, or update records if data exists
+     *
+     * @param opEntity         the group resource control info entity will be add
+     * @param groupName        operate target
+     * @param resChkEnable     resource check status
+     * @param allowedB2CRate   allowed B2C rate
+     * @param strBuff          the print info string buffer
+     * @param result           the process result return
+     * @return true if success otherwise false
+     */
+    boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+                                Boolean resChkEnable, int allowedB2CRate,
+                                StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Add group control configure, or update records if data exists
+     *
+     * @param entity  the group resource control info entity will be add
+     * @param strBuff   the print info string buffer
+     * @param result    the process result return
+     * @return true if success otherwise false
+     */
+    boolean insertGroupCtrlConf(GroupResCtrlEntity entity,
+                                StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Delete group control information
+     *
+     * @param operator       operator
+     * @param groupName      need deleted group
+     * @param strBuff        the print info string buffer
+     * @param result         the process result return
+     * @return    delete result
+     */
+    boolean delGroupCtrlConf(String operator, String groupName,
+                             StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Get group control information by group and query condition
+     *
+     * @param groupSet       need queried group set
+     * @param qryEntity      query condition
+     * @return    query result
+     */
+    Map<String, GroupResCtrlEntity> getGroupCtrlConf(Set<String> groupSet,
+                                                     GroupResCtrlEntity qryEntity);
+
+    /**
+     * Get group control information by group name
+     *
+     * @param groupName       need queried group name
+     * @return    query result
+     */
+    GroupResCtrlEntity getGroupCtrlConf(String groupName);
+
+    // //////////////////////////////////////////////////////////////////
+
+    /**
+     * Add or update group resource configure information
+     *
+     * @param isAddOp        whether add operation
+     * @param opEntity       operator information
+     * @param groupName      the group name
+     * @param topicName      the topic name
+     * @param enableCsm      enable or disable consume
+     * @param disableRsn     the disable reason
+     * @param enableFlt      enable or disable filter
+     * @param fltCondStr     the filter conditions
+     * @param strBuff        the print information string buffer
+     * @param result         the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, BaseEntity opEntity,
+                                    String groupName, String topicName,
+                                    Boolean enableCsm, String disableRsn,
+                                    Boolean enableFlt, String fltCondStr,
+                                    StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * add or update group's consume control information
+     *
+     * @param isAddOp   whether add operation
+     * @param entity    need add or update group configure info
+     * @param strBuff   the print info string buffer
+     * @param result    the process result return
+     * @return true if success otherwise false
+     */
+    boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, GroupConsumeCtrlEntity entity,
+                                    StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Add consume control information, or update records if data exists
+     *
+     * @param opEntity   add or update base information, include creator, create time, etc.
+     * @param groupName  add or update groupName information
+     * @param topicName  add or update topicName information
+     * @param enableCsm  add or update consume enable status information
+     * @param disReason  add or update disable consume reason
+     * @param enableFlt  add or update filter enable status information
+     * @param fltCondStr add or update filter configure information
+     * @param strBuff    the print info string buffer
+     * @param result     the process result return
+     * @return    process result
+     */
+    boolean insertConsumeCtrlInfo(BaseEntity opEntity, String groupName,
+                                  String topicName, Boolean enableCsm,
+                                  String disReason, Boolean enableFlt,
+                                  String fltCondStr, StringBuilder strBuff,
+                                  ProcessResult result);
+
+    /**
+     * Add consume control information, or update records if data exists
+     *
+     * @param entity     add or update group consume control info
+     * @param strBuff    the print info string buffer
+     * @param result     the process result return
+     * @return    process result
+     */
+    boolean insertConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
+                                  StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Delete consume control configure
+     *
+     * @param operator   the operator
+     * @param groupName  the group name
+     * @param topicName  the topic name
+     * @param strBuff    the print info string buffer
+     * @param result     the process result return
+     * @return    process result
+     */
+    boolean delConsumeCtrlConf(String operator,
+                               String groupName, String topicName,
+                               StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Get all group consume control record for a specific topic
+     *
+     * @param topicName  the queried topic name
+     * @return group consume control list
+     */
+    List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName);
+
+    /**
+     * Get all disable consumed topic for a specific group
+     *
+     * @param groupName  the queried group name
+     * @return  the disable consumed topic list
+     */
+    Set<String> getDisableTopicByGroupName(String groupName);
+
+    /**
+     * Get group consume control configure for topic & group set
+     *
+     * @param groupSet the topic name set
+     * @param topicSet the group name set
+     * @param qryEntry the query conditions
+     * @return  the queried consume control record
+     */
+    Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
+            Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry);
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index d8d8e28..05cb02b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 
 public interface TopicDeployMapper extends AbstractMapper {
@@ -33,8 +35,8 @@ public interface TopicDeployMapper extends AbstractMapper {
      * @param result   the process result
      * @return         whether success
      */
-    boolean addTopicConf(TopicDeployEntity entity,
-                         StringBuilder strBuff, ProcessResult result);
+    boolean addTopicDeployConf(TopicDeployEntity entity,
+                               StringBuilder strBuff, ProcessResult result);
 
     /**
      * Update the topic deploy configure info from store
@@ -44,8 +46,23 @@ public interface TopicDeployMapper extends AbstractMapper {
      * @param result   the process result
      * @return         whether success
      */
-    boolean updTopicConf(TopicDeployEntity entity,
-                         StringBuilder strBuff, ProcessResult result);
+    boolean updTopicDeployConf(TopicDeployEntity entity,
+                               StringBuilder strBuff, ProcessResult result);
+
+    /**
+     * Update topic deploy status info from store
+     *
+     * @param opEntity    the operator info
+     * @param brokerId    the broker id
+     * @param topicName   the topic name
+     * @param topicStatus the new topic status
+     * @param strBuff     the string buffer
+     * @param result      the process result
+     * @return            whether success
+     */
+    boolean updTopicDeployStatus(BaseEntity opEntity, int brokerId,
+                                 String topicName, TopicStatus topicStatus,
+                                 StringBuilder strBuff, ProcessResult result);
 
     /**
      * delete topic deploy configure info from store
@@ -55,7 +72,7 @@ public interface TopicDeployMapper extends AbstractMapper {
      * @param result     the process result
      * @return           whether success
      */
-    boolean delTopicConf(String recordKey, StringBuilder strBuff, ProcessResult result);
+    boolean delTopicDeployConf(String recordKey, StringBuilder strBuff, ProcessResult result);
 
     /**
      * delete topic deploy configure info from store
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java
index 9ee35c3..c466667 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java
@@ -22,6 +22,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
@@ -66,7 +68,7 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
             return result.isSuccess();
         }
         // Check whether the configured ports conflict in the record
-        if (WebParameterUtils.isConflictedPortsSet(entity.getBrokerPort(),
+        if (!WebParameterUtils.isValidPortsSet(entity.getBrokerPort(),
                 entity.getBrokerTLSPort(), entity.getBrokerWebPort(), strBuff, result)) {
             return result.isSuccess();
         }
@@ -104,26 +106,26 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
             return result.isSuccess();
         }
         // Check whether the configured ports conflict in the record
-        if (WebParameterUtils.isConflictedPortsSet(newEntity.getBrokerPort(),
+        if (!WebParameterUtils.isValidPortsSet(newEntity.getBrokerPort(),
                 newEntity.getBrokerTLSPort(), newEntity.getBrokerWebPort(),
                 strBuff, result)) {
             return result.isSuccess();
         }
         // Check manage status
-        if (isIllegalManageStatusChange(newEntity, curEntity, strBuff, result)) {
+        if (!isValidMngStatusChange(newEntity, curEntity, strBuff, result)) {
             return result.isSuccess();
         }
         // Store data to persistent
         if (putConfig2Persistent(newEntity, strBuff, result)) {
             putRecord2Caches(newEntity);
-            result.setSuccResult(curEntity);
+            result.setSuccResult(null);
         }
         return result.isSuccess();
     }
 
     @Override
     public boolean updBrokerMngStatus(BaseEntity opEntity,
-                                      Integer brokerId, ManageStatus newMngStatus,
+                                      int brokerId, ManageStatus newMngStatus,
                                       StringBuilder strBuff, ProcessResult result) {
         // Check the existence of records by brokerId
         BrokerConfEntity curEntity = brokerConfCache.get(brokerId);
@@ -139,22 +141,21 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
         BrokerConfEntity newEntity = curEntity.clone();
         newEntity.updBaseModifyInfo(opEntity);
         if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
-                curEntity.getBrokerPort(), curEntity.getBrokerTLSPort(),
-                curEntity.getBrokerWebPort(), curEntity.getRegionId(),
-                curEntity.getGroupId(), newMngStatus,
-                curEntity.getTopicProps())) {
+                TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+                TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+                TBaseConstants.META_VALUE_UNDEFINED, newMngStatus, null)) {
             result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
                     "Broker configure not changed!");
             return result.isSuccess();
         }
         // Check manage status
-        if (isIllegalManageStatusChange(newEntity, curEntity, strBuff, result)) {
+        if (!isValidMngStatusChange(newEntity, curEntity, strBuff, result)) {
             return result.isSuccess();
         }
         // Store data to persistent
         if (putConfig2Persistent(newEntity, strBuff, result)) {
             putRecord2Caches(newEntity);
-            result.setSuccResult(curEntity);
+            result.setSuccResult(null);
         }
         return result.isSuccess();
     }
@@ -168,20 +169,11 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
             result.setSuccResult(null);
             return result.isSuccess();
         }
-        // Check broker's manage status
-        if (curEntity.getManageStatus().isOnlineStatus()) {
-            result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
-                    strBuff.append("Illegal manage status, please offline the broker(")
-                            .append(WebFieldDef.BROKERID.name).append("=")
-                            .append(curEntity.getBrokerId()).append(") first!").toString());
-            strBuff.delete(0, strBuff.length());
-            return result.isSuccess();
-        }
         // Delete record from persistent
         delConfigFromPersistent(brokerId, strBuff);
         // Clear cache data
         delRecordFromCaches(brokerId);
-        result.setSuccResult(curEntity);
+        result.setSuccResult(null);
         return result.isSuccess();
     }
 
@@ -384,20 +376,20 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
     }
 
     /**
-     * Check whether the management status change is illegal
+     * Check whether the management status change is legal
      *
      * @param newEntity  the entity to be updated
      * @param curEntity  the current entity
      * @param strBuff    string buffer
      * @param result     check result of parameter value
-     * @return  true for illegal, false for legal
+     * @return  true for valid, false for invalid
      */
-    private boolean isIllegalManageStatusChange(BrokerConfEntity newEntity,
-                                                BrokerConfEntity curEntity,
-                                                StringBuilder strBuff,
-                                                ProcessResult result) {
+    private boolean isValidMngStatusChange(BrokerConfEntity newEntity,
+                                           BrokerConfEntity curEntity,
+                                           StringBuilder strBuff,
+                                           ProcessResult result) {
         if (newEntity.getManageStatus() == curEntity.getManageStatus()) {
-            return false;
+            return true;
         }
         if (((newEntity.getManageStatus().getCode() < ManageStatus.STATUS_MANAGE_ONLINE.getCode())
                 && (curEntity.getManageStatus().getCode() >= ManageStatus.STATUS_MANAGE_ONLINE.getCode()))
@@ -411,8 +403,8 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
                             .append(" for the broker(").append(WebFieldDef.BROKERID.name).append("=")
                             .append(curEntity.getBrokerId()).append(")!").toString());
             strBuff.delete(0, strBuff.length());
-            return !result.isSuccess();
+            return result.isSuccess();
         }
-        return false;
+        return true;
     }
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java
index 827c44a..16c3411 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java
@@ -39,74 +39,42 @@ public abstract class AbsClusterConfigMapperImpl implements ClusterConfigMapper
     }
 
     @Override
-    public boolean addClusterConfig(ClusterSettingEntity entity,
-                                    StringBuilder strBuff, ProcessResult result) {
+    public boolean addUpdClusterConfig(ClusterSettingEntity entity,
+                                       StringBuilder strBuff, ProcessResult result) {
+        ClusterSettingEntity newEntity;
         // Check whether the configure record already exist
         ClusterSettingEntity curEntity = metaDataCache.get(entity.getRecordKey());
         if (curEntity == null) {
-            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
-                    strBuff.append("Existed record found for ")
-                            .append(entity.getRecordKey()).toString());
-            strBuff.delete(0, strBuff.length());
-            return result.isSuccess();
+            newEntity = entity.clone();
+        } else {
+            // Build the entity that need to be updated
+            newEntity = curEntity.clone();
+            newEntity.updBaseModifyInfo(entity);
+            if (!newEntity.updModifyInfo(entity.getDataVerId(),
+                    entity.getBrokerPort(), entity.getBrokerTLSPort(),
+                    entity.getBrokerWebPort(), entity.getMaxMsgSizeInMB(),
+                    entity.getQryPriorityId(), entity.enableFlowCtrl(),
+                    entity.getGloFlowCtrlRuleCnt(), entity.getGloFlowCtrlRuleInfo(),
+                    entity.getClsDefTopicProps())) {
+                result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                        "Cluster configure not changed!");
+                return result.isSuccess();
+            }
         }
         // Check whether the configured ports conflict in the record
-        if (WebParameterUtils.isConflictedPortsSet(entity.getBrokerPort(),
-                entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
-                strBuff, result)) {
-            return result.isSuccess();
-        }
-        // Store data to persistent
-        if (putConfig2Persistent(entity, strBuff, result)) {
-            metaDataCache.put(entity.getRecordKey(), entity);
-        }
-        return result.isSuccess();
-    }
-
-    @Override
-    public boolean updClusterConfig(ClusterSettingEntity entity,
-                                    StringBuilder strBuff, ProcessResult result) {
-        // Check for the record to be updated
-        ClusterSettingEntity curEntity = metaDataCache.get(entity.getRecordKey());
-        if (curEntity == null) {
-            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
-                    strBuff.append("Not found cluster configure for (")
-                            .append(entity.getRecordKey()).append(")!").toString());
-            return result.isSuccess();
-        }
-        // Build the entity that need to be updated
-        ClusterSettingEntity newEntity = curEntity.clone();
-        newEntity.updBaseModifyInfo(entity);
-        if (!newEntity.updModifyInfo(entity.getDataVerId(),
-                entity.getBrokerPort(), entity.getBrokerTLSPort(),
-                entity.getBrokerWebPort(), entity.getMaxMsgSizeInMB(),
-                entity.getQryPriorityId(), entity.enableFlowCtrl(),
-                entity.getGloFlowCtrlRuleCnt(), entity.getGloFlowCtrlRuleInfo(),
-                entity.getClsDefTopicProps())) {
-            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
-                    "Cluster configure not changed!");
-            return result.isSuccess();
-        }
-        // Check whether the configured ports conflict in the record
-        if (WebParameterUtils.isConflictedPortsSet(newEntity.getBrokerPort(),
+        if (!WebParameterUtils.isValidPortsSet(newEntity.getBrokerPort(),
                 newEntity.getBrokerTLSPort(), newEntity.getBrokerWebPort(),
                 strBuff, result)) {
             return result.isSuccess();
         }
         // Store data to persistent
         if (putConfig2Persistent(newEntity, strBuff, result)) {
-            metaDataCache.put(newEntity.getRecordKey(), newEntity);
-            result.setSuccResult(curEntity);
+            metaDataCache.put(newEntity.getRecordKey(), entity);
         }
         return result.isSuccess();
     }
 
     @Override
-    public ClusterSettingEntity getClusterConfig() {
-        return metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
-    }
-
-    @Override
     public boolean delClusterConfig(StringBuilder strBuff, ProcessResult result) {
         ClusterSettingEntity curEntity =
                 metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
@@ -116,10 +84,15 @@ public abstract class AbsClusterConfigMapperImpl implements ClusterConfigMapper
         }
         delConfigFromPersistent(strBuff, TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
         metaDataCache.remove(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
-        result.setSuccResult(curEntity);
+        result.setSuccResult(null);
         return true;
     }
 
+    @Override
+    public ClusterSettingEntity getClusterConfig() {
+        return metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+    }
+
     /**
      * Clear cached data
      */
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
index f2a03d3..dc8703b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
@@ -39,11 +39,11 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
             LoggerFactory.getLogger(AbsConsumeCtrlMapperImpl.class);
     // configure cache
     private final ConcurrentHashMap<String/* recordKey */, GroupConsumeCtrlEntity>
-            grpConsumeCtrlCache = new ConcurrentHashMap<>();
+            consumeCtrlCache = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
-            grpConsumeCtrlTopicCache = new ConcurrentHashMap<>();
+            topic2RecordCache = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* groupName */, ConcurrentHashSet<String>>
-            grpConsumeCtrlGroupCache = new ConcurrentHashMap<>();
+            group2RecordCache = new ConcurrentHashMap<>();
 
     public AbsConsumeCtrlMapperImpl() {
         // Initial instant
@@ -53,7 +53,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
     public boolean addGroupConsumeCtrlConf(GroupConsumeCtrlEntity entity,
                                            StringBuilder strBuff, ProcessResult result) {
         // Checks whether the record already exists
-        GroupConsumeCtrlEntity curEntity = grpConsumeCtrlCache.get(entity.getRecordKey());
+        GroupConsumeCtrlEntity curEntity = consumeCtrlCache.get(entity.getRecordKey());
         if (curEntity != null) {
             result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
                     strBuff.append("Existed record found for groupName-topicName(")
@@ -72,7 +72,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
     public boolean updGroupConsumeCtrlConf(GroupConsumeCtrlEntity entity,
                                            StringBuilder strBuff, ProcessResult result) {
         // Checks whether the record already exists
-        GroupConsumeCtrlEntity curEntity = grpConsumeCtrlCache.get(entity.getRecordKey());
+        GroupConsumeCtrlEntity curEntity = consumeCtrlCache.get(entity.getRecordKey());
         if (curEntity == null) {
             result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
                     strBuff.append("Not found consume control for through groupName-topicName(")
@@ -93,22 +93,22 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
         // Store data to persistent
         if (putConfig2Persistent(newEntity, strBuff, result)) {
             putRecord2Caches(newEntity);
-            result.setSuccResult(curEntity);
+            result.setSuccResult(null);
         }
         return result.isSuccess();
     }
 
     @Override
-    public boolean isTopicNameInUsed(String topicName) {
+    public boolean isTopicNameInUse(String topicName) {
         ConcurrentHashSet<String> consumeCtrlSet =
-                grpConsumeCtrlTopicCache.get(topicName);
+                topic2RecordCache.get(topicName);
         return (consumeCtrlSet != null && !consumeCtrlSet.isEmpty());
     }
 
     @Override
-    public boolean hasGroupConsumeCtrlConf(String groupName) {
+    public boolean isGroupNameInUse(String groupName) {
         ConcurrentHashSet<String> keySet =
-                grpConsumeCtrlGroupCache.get(groupName);
+                group2RecordCache.get(groupName);
         return (keySet != null && !keySet.isEmpty());
     }
 
@@ -117,14 +117,14 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
                                            StringBuilder strBuff,
                                            ProcessResult result) {
         GroupConsumeCtrlEntity curEntity =
-                grpConsumeCtrlCache.get(recordKey);
+                consumeCtrlCache.get(recordKey);
         if (curEntity == null) {
             result.setSuccResult(null);
             return true;
         }
         delConfigFromPersistent(recordKey, strBuff);
         delRecordFromCaches(recordKey);
-        result.setSuccResult(curEntity);
+        result.setSuccResult(null);
         return true;
     }
 
@@ -139,11 +139,11 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
                 result.setSuccResult(null);
                 return true;
             } else {
-                keySet = grpConsumeCtrlTopicCache.get(topicName);
+                keySet = topic2RecordCache.get(topicName);
             }
         } else {
             if (topicName == null) {
-                keySet = grpConsumeCtrlGroupCache.get(groupName);
+                keySet = group2RecordCache.get(groupName);
             } else {
                 keySet.add(KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName));
             }
@@ -164,13 +164,13 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
 
     @Override
     public GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey) {
-        return grpConsumeCtrlCache.get(recordKey);
+        return consumeCtrlCache.get(recordKey);
     }
 
     @Override
     public List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName) {
         ConcurrentHashSet<String> keySet =
-                grpConsumeCtrlTopicCache.get(topicName);
+                topic2RecordCache.get(topicName);
         if (keySet == null || keySet.isEmpty()) {
             return Collections.emptyList();
         }
@@ -180,7 +180,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
             if (recordKey == null) {
                 continue;
             }
-            entity = grpConsumeCtrlCache.get(recordKey);
+            entity = consumeCtrlCache.get(recordKey);
             if (entity != null) {
                 result.add(entity);
             }
@@ -191,14 +191,14 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
     @Override
     public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
         ConcurrentHashSet<String> keySet =
-                grpConsumeCtrlGroupCache.get(groupName);
+                group2RecordCache.get(groupName);
         if (keySet == null || keySet.isEmpty()) {
             return Collections.emptyList();
         }
         GroupConsumeCtrlEntity entity;
         List<GroupConsumeCtrlEntity> result = new ArrayList<>();
         for (String recordKey : keySet) {
-            entity = grpConsumeCtrlCache.get(recordKey);
+            entity = consumeCtrlCache.get(recordKey);
             if (entity != null) {
                 result.add(entity);
             }
@@ -209,7 +209,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
     @Override
     public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
             String groupName, String topicName) {
-        return grpConsumeCtrlCache.get(
+        return consumeCtrlCache.get(
                 KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName));
     }
 
@@ -223,7 +223,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
         GroupConsumeCtrlEntity tmpEntity;
         List<GroupConsumeCtrlEntity> itemLst;
         if (totalMatchedSet == null) {
-            for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
+            for (GroupConsumeCtrlEntity entity : consumeCtrlCache.values()) {
                 if (entity == null || (qryEntry != null && !entity.isMatched(qryEntry))) {
                     continue;
                 }
@@ -233,7 +233,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
             }
         } else {
             for (String recKey : totalMatchedSet) {
-                tmpEntity = grpConsumeCtrlCache.get(recKey);
+                tmpEntity = consumeCtrlCache.get(recKey);
                 if (tmpEntity == null || (qryEntry != null && !tmpEntity.isMatched(qryEntry))) {
                     continue;
                 }
@@ -249,9 +249,9 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
     public List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity qryEntity) {
         List<GroupConsumeCtrlEntity> retEntities = new ArrayList<>();
         if (qryEntity == null) {
-            retEntities.addAll(grpConsumeCtrlCache.values());
+            retEntities.addAll(consumeCtrlCache.values());
         } else {
-            for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
+            for (GroupConsumeCtrlEntity entity : consumeCtrlCache.values()) {
                 if (entity != null && entity.isMatched(qryEntity)) {
                     retEntities.add(entity);
                 }
@@ -270,7 +270,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
         if (groupSet != null && !groupSet.isEmpty()) {
             groupKeySet = new HashSet<>();
             for (String group : groupSet) {
-                recSet = grpConsumeCtrlGroupCache.get(group);
+                recSet = group2RecordCache.get(group);
                 if (recSet != null && !recSet.isEmpty()) {
                     groupKeySet.addAll(recSet);
                 }
@@ -283,7 +283,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
         if (topicSet != null && !topicSet.isEmpty()) {
             topicKeySet = new HashSet<>();
             for (String topic : topicSet) {
-                recSet = grpConsumeCtrlTopicCache.get(topic);
+                recSet = topic2RecordCache.get(topic);
                 if (recSet != null && !recSet.isEmpty()) {
                     topicKeySet.addAll(recSet);
                 }
@@ -316,9 +316,9 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
      * Clear cached data
      */
     protected void clearCachedData() {
-        grpConsumeCtrlTopicCache.clear();
-        grpConsumeCtrlGroupCache.clear();
-        grpConsumeCtrlCache.clear();
+        topic2RecordCache.clear();
+        group2RecordCache.clear();
+        consumeCtrlCache.clear();
     }
 
     /**
@@ -327,23 +327,23 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
      * @param entity  need added or updated entity
      */
     protected void putRecord2Caches(GroupConsumeCtrlEntity entity) {
-        grpConsumeCtrlCache.put(entity.getRecordKey(), entity);
+        consumeCtrlCache.put(entity.getRecordKey(), entity);
         // add topic index map
         ConcurrentHashSet<String> keySet =
-                grpConsumeCtrlTopicCache.get(entity.getTopicName());
+                topic2RecordCache.get(entity.getTopicName());
         if (keySet == null) {
             ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
-            keySet = grpConsumeCtrlTopicCache.putIfAbsent(entity.getTopicName(), tmpSet);
+            keySet = topic2RecordCache.putIfAbsent(entity.getTopicName(), tmpSet);
             if (keySet == null) {
                 keySet = tmpSet;
             }
         }
         keySet.add(entity.getRecordKey());
         // add group index map
-        keySet = grpConsumeCtrlGroupCache.get(entity.getGroupName());
+        keySet = group2RecordCache.get(entity.getGroupName());
         if (keySet == null) {
             ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
-            keySet = grpConsumeCtrlGroupCache.putIfAbsent(entity.getGroupName(), tmpSet);
+            keySet = group2RecordCache.putIfAbsent(entity.getGroupName(), tmpSet);
             if (keySet == null) {
                 keySet = tmpSet;
             }
@@ -378,25 +378,25 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
      */
     private void delRecordFromCaches(String recordKey) {
         GroupConsumeCtrlEntity curEntity =
-                grpConsumeCtrlCache.remove(recordKey);
+                consumeCtrlCache.remove(recordKey);
         if (curEntity == null) {
             return;
         }
         // add topic index
         ConcurrentHashSet<String> keySet =
-                grpConsumeCtrlTopicCache.get(curEntity.getTopicName());
+                topic2RecordCache.get(curEntity.getTopicName());
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                grpConsumeCtrlTopicCache.remove(curEntity.getTopicName());
+                topic2RecordCache.remove(curEntity.getTopicName());
             }
         }
         // delete group index
-        keySet = grpConsumeCtrlGroupCache.get(curEntity.getGroupName());
+        keySet = group2RecordCache.get(curEntity.getGroupName());
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                grpConsumeCtrlGroupCache.remove(curEntity.getGroupName());
+                group2RecordCache.remove(curEntity.getGroupName());
             }
         }
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java
index 25fbe43..dc3edea 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java
@@ -83,7 +83,7 @@ public abstract class AbsGroupResCtrlMapperImpl implements GroupResCtrlMapper {
         // Store data to persistent
         if (putConfig2Persistent(newEntity, strBuff, result)) {
             groupBaseCtrlCache.put(newEntity.getGroupName(), newEntity);
-            result.setSuccResult(curEntity);
+            result.setSuccResult(null);
         }
         return result.isSuccess();
     }
@@ -98,7 +98,7 @@ public abstract class AbsGroupResCtrlMapperImpl implements GroupResCtrlMapper {
         }
         delConfigFromPersistent(groupName, strBuff);
         groupBaseCtrlCache.remove(groupName);
-        result.setSuccResult(curEntity);
+        result.setSuccResult(null);
         return true;
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaStoreMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaStoreMapperImpl.java
new file mode 100644
index 0000000..933cd86
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaStoreMapperImpl.java
@@ -0,0 +1,1214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metamanage.metastore.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.inlong.tubemq.server.common.utils.RowLock;
+import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.MetaStoreMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.ClusterConfigMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.ConsumeCtrlMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupResCtrlMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicCtrlMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AbsMetaStoreMapperImpl implements MetaStoreMapper {
+    protected static final Logger logger =
+            LoggerFactory.getLogger(AbsMetaStoreMapperImpl.class);
+    // service status
+    // 0 stopped, 1 starting, 2 started, 3 stopping
+    private final AtomicInteger srvStatus = new AtomicInteger(0);
+    // the observers focusing on active-standby switching
+    private final List<AliveObserver> eventObservers = new ArrayList<>();
+
+    // row lock.
+    private final RowLock metaRowLock;
+    // default cluster setting
+    private static final ClusterSettingEntity defClusterSetting =
+            new ClusterSettingEntity().fillDefaultValue();
+    // cluster default setting
+    private ClusterConfigMapper clusterConfigMapper;
+    // broker configure
+    private BrokerConfigMapper brokerConfigMapper;
+    // topic deployment configure
+    private TopicDeployMapper topicDeployMapper;
+    // topic control configure
+    private TopicCtrlMapper topicCtrlMapper;
+    // group resource control configure
+    private GroupResCtrlMapper groupResCtrlMapper;
+    // group consume control configure
+    private ConsumeCtrlMapper consumeCtrlMapper;
+
+    public AbsMetaStoreMapperImpl(int rowLockWaiDurMs) {
+        this.metaRowLock =
+                new RowLock("MetaData-RowLock", rowLockWaiDurMs);
+    }
+
+    @Override
+    public boolean addOrUpdClusterDefSetting(BaseEntity opEntity,
+                                             int brokerPort, int brokerTlsPort,
+                                             int brokerWebPort, int maxMsgSizeMB,
+                                             int qryPriorityId, Boolean flowCtrlEnable,
+                                             int flowRuleCnt, String flowCtrlInfo,
+                                             TopicPropGroup topicProps,
+                                             StringBuilder strBuff, ProcessResult result) {
+        Integer lid = null;
+        boolean isAddOp = false;
+        String printPrefix = "[updClusterConfig], ";
+        ClusterSettingEntity curEntity;
+        ClusterSettingEntity newEntity;
+        try {
+            // lock clusterConfig meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8("clusterConfig#"), true);
+            // build add or update data
+            curEntity = clusterConfigMapper.getClusterConfig();
+            if (curEntity == null) {
+                isAddOp = true;
+                printPrefix = "[addClusterConfig], ";
+                newEntity = new ClusterSettingEntity(opEntity);
+                newEntity.fillDefaultValue();
+                newEntity.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+                        brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
+                        flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps);
+            } else {
+                newEntity = curEntity.clone();
+                newEntity.updBaseModifyInfo(opEntity);
+                if (!newEntity.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+                        brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
+                        flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps)) {
+                    result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                            "Cluster configure not changed!");
+                    return result.isSuccess();
+                }
+            }
+            // add or update data to storage
+            clusterConfigMapper.addUpdClusterConfig(newEntity, strBuff, result);
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        if (result.isSuccess()) {
+            // print operation result
+            if (isAddOp) {
+                strBuff.append(printPrefix).append(newEntity.getCreateUser())
+                        .append(" added cluster configure: ").append(newEntity);
+            } else {
+                strBuff.append(printPrefix).append(newEntity.getModifyUser())
+                        .append(" updated cluster configure: from ").append(curEntity)
+                        .append(" to ").append(newEntity);
+            }
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public ClusterSettingEntity getClusterDefSetting(boolean isMustConf) {
+        ClusterSettingEntity curClsSetting =
+                clusterConfigMapper.getClusterConfig();
+        if (!isMustConf && curClsSetting == null) {
+            curClsSetting = defClusterSetting;
+        }
+        return curClsSetting;
+    }
+
+    // //////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public boolean addOrUpdBrokerConfig(boolean isAddOp, BaseEntity opEntity,
+                                 int brokerId, String brokerIp, int brokerPort,
+                                 int brokerTlsPort, int brokerWebPort,
+                                 int regionId, int groupId, ManageStatus mngStatus,
+                                 TopicPropGroup topicProps, StringBuilder strBuff,
+                                 ProcessResult result) {
+        BrokerConfEntity entity =
+                new BrokerConfEntity(opEntity, brokerId, brokerIp);
+        entity.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+                brokerTlsPort, brokerWebPort, regionId, groupId, mngStatus, topicProps);
+        return addOrUpdBrokerConfig(isAddOp, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean addOrUpdBrokerConfig(boolean isAddOp, BrokerConfEntity entity,
+                                        StringBuilder strBuff, ProcessResult result) {
+        Integer lid = null;
+        BrokerConfEntity curEntity = null;
+        BrokerConfEntity newEntity;
+        String printPrefix = "[addBrokerConf], ";
+        // execute add or update operation
+        try {
+            // lock brokerId meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(String.valueOf(entity.getBrokerId())), true);
+            if (isAddOp) {
+                brokerConfigMapper.addBrokerConf(entity, strBuff, result);
+            } else {
+                printPrefix = "[updBrokerConf], ";
+                curEntity = brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
+                brokerConfigMapper.updBrokerConf(entity, strBuff, result);
+            }
+            newEntity = brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            if (isAddOp) {
+                strBuff.append(printPrefix).append(entity.getCreateUser())
+                        .append(" added broker configure: ").append(newEntity);
+            } else {
+                strBuff.append(printPrefix).append(entity.getModifyUser())
+                        .append(" updated broker configure from ").append(curEntity)
+                        .append(" to ").append(newEntity);
+            }
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean changeBrokerConfStatus(BaseEntity opEntity,
+                                          int brokerId, ManageStatus newMngStatus,
+                                          StringBuilder strBuff, ProcessResult result) {
+        Integer lid = null;
+        BrokerConfEntity curEntity;
+        BrokerConfEntity newEntity;
+        String printPrefix = "[updBrokerConf], ";
+        // execute update operation
+        try {
+            // lock brokerId meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(String.valueOf(brokerId)), true);
+            curEntity = brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+            brokerConfigMapper.updBrokerMngStatus(opEntity,
+                    brokerId, newMngStatus, strBuff, result);
+            newEntity = brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            strBuff.append(printPrefix).append(opEntity.getModifyUser())
+                    .append(" updated broker configure from ").append(curEntity)
+                    .append(" to ").append(newEntity);
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delBrokerConfInfo(String operator, int brokerId, boolean rsvData,
+                                     StringBuilder strBuff, ProcessResult result) {
+        Integer lid = null;
+        BrokerConfEntity curEntity;
+        String printPrefix = "[delBrokerConf], ";
+        // execute delete operation
+        try {
+            // lock brokerId meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(String.valueOf(brokerId)), true);
+            // get current broker configure
+            curEntity = brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+            if (curEntity == null) {
+                result.setSuccResult(null);
+                return result.isSuccess();
+            }
+            // check broker's manage status
+            if (curEntity.getManageStatus().isOnlineStatus()) {
+                result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                        strBuff.append("Illegal manage status, please offline the broker(")
+                                .append(WebFieldDef.BROKERID.name).append("=")
+                                .append(curEntity.getBrokerId()).append(") first!").toString());
+                strBuff.delete(0, strBuff.length());
+                return result.isSuccess();
+            }
+            // check topic deploy status
+            if (!topicDeployMapper.getConfiguredTopicInfo(curEntity.getBrokerId()).isEmpty()) {
+                if (rsvData) {
+                    if (!topicDeployMapper.delTopicConfByBrokerId(brokerId, strBuff, result)) {
+                        return result.isSuccess();
+                    }
+                    strBuff.append("[delTopicDeployByBrokerId], ").append(operator)
+                            .append(" deleted topic deploy configure: ").append(brokerId);
+                    logger.info(strBuff.toString());
+                    strBuff.delete(0, strBuff.length());
+                } else {
+                    result.setFailResult(DataOpErrCode.DERR_UNCLEANED.getCode(),
+                            strBuff.append("Illegal operate conditions, the broker(")
+                                    .append(curEntity.getBrokerId())
+                                    .append(")'s topic deploy configure uncleaned, please delete them first!")
+                                    .toString());
+                    strBuff.delete(0, strBuff.length());
+                    return result.isSuccess();
+                }
+            }
+            // execute delete operation
+            brokerConfigMapper.delBrokerConf(brokerId, strBuff, result);
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            strBuff.append(printPrefix).append(operator)
+                    .append(" deleted broker configure: ").append(curEntity);
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity) {
+        return brokerConfigMapper.getBrokerConfInfo(qryEntity);
+    }
+
+    @Override
+    public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+                                                            Set<String> brokerIpSet,
+                                                            BrokerConfEntity qryEntity) {
+        return brokerConfigMapper.getBrokerConfInfo(brokerIdSet, brokerIpSet, qryEntity);
+    }
+
+    @Override
+    public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
+        return brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+    }
+
+    @Override
+    public BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp) {
+        return brokerConfigMapper.getBrokerConfByBrokerIp(brokerIp);
+    }
+
+    // ///////////////////////////////////////////////////////////////////////////////
+
+    public boolean addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+                                         String topicName, int topicNameId,
+                                         Boolean enableTopicAuth, int maxMsgSizeInMB,
+                                         StringBuilder sBuffer, ProcessResult result) {
+        TopicCtrlEntity entity =
+                new TopicCtrlEntity(opEntity, topicName);
+        entity.updModifyInfo(opEntity.getDataVerId(),
+                topicNameId, maxMsgSizeInMB, enableTopicAuth);
+        return addOrUpdTopicCtrlConf(isAddOp, entity, sBuffer, result);
+    }
+
+    @Override
+    public boolean addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
+                                         StringBuilder strBuff, ProcessResult result) {
+        return innAddOrUpdTopicCtrlConf(true, isAddOp, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean insertTopicCtrlConf(BaseEntity opEntity,
+                                       String topicName, Boolean enableTopicAuth,
+                                       StringBuilder strBuff, ProcessResult result) {
+        TopicCtrlEntity entity =
+                new TopicCtrlEntity(opEntity, topicName);
+        entity.updModifyInfo(opEntity.getDataVerId(),
+                TBaseConstants.META_VALUE_UNDEFINED,
+                TBaseConstants.META_VALUE_UNDEFINED, enableTopicAuth);
+        return innAddOrUpdTopicCtrlConf(false, false, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean insertTopicCtrlConf(TopicCtrlEntity entity,
+                                       StringBuilder strBuff, ProcessResult result) {
+        return innAddOrUpdTopicCtrlConf(false, false, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean delTopicCtrlConf(String operator, String topicName,
+                                    StringBuilder strBuff, ProcessResult result) {
+        Integer lid = null;
+        TopicCtrlEntity curEntity;
+        String printPrefix = "[delTopicCtrlConf], ";
+        // execute delete operation
+        try {
+            // lock brokerId meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(topicName), true);
+            curEntity = topicCtrlMapper.getTopicCtrlConf(topicName);
+            if (curEntity == null) {
+                result.setSuccResult(null);
+                return result.isSuccess();
+            }
+            // check topic use status
+            if (topicDeployMapper.isTopicDeployed(topicName)) {
+                result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                        strBuff.append("TopicName ").append(topicName)
+                                .append(" is in use, please delete deploy configure first!")
+                                .toString());
+                strBuff.delete(0, strBuff.length());
+                return result.isSuccess();
+            }
+            if (consumeCtrlMapper.isTopicNameInUse(topicName)) {
+                result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+                        strBuff.append("TopicName ").append(topicName)
+                                .append(" is in use, please delete the consume control first!")
+                                .toString());
+                strBuff.delete(0, strBuff.length());
+                return result.isSuccess();
+            }
+            topicCtrlMapper.delTopicCtrlConf(topicName, strBuff, result);
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        // print operation log
+        if (result.isSuccess()) {
+            strBuff.append(printPrefix).append(operator)
+                    .append(" deleted topic control configure: ")
+                    .append(curEntity);
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public TopicCtrlEntity getTopicCtrlByTopicName(String topicName) {
+        return this.topicCtrlMapper.getTopicCtrlConf(topicName);
+    }
+
+    @Override
+    public int getTopicMaxMsgSizeInMB(String topicName) {
+        // get maxMsgSizeInMB info
+        ClusterSettingEntity clusterSettingEntity = getClusterDefSetting(false);
+        int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
+        TopicCtrlEntity topicCtrlEntity = topicCtrlMapper.getTopicCtrlConf(topicName);
+        if (topicCtrlEntity != null) {
+            maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+        }
+        return maxMsgSizeInMB;
+    }
+
+    @Override
+    public List<TopicCtrlEntity> queryTopicCtrlConf(TopicCtrlEntity qryEntity) {
+        return topicCtrlMapper.getTopicCtrlConf(qryEntity);
+    }
+
+    @Override
+    public Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+                                                         TopicCtrlEntity qryEntity) {
+        return topicCtrlMapper.getTopicCtrlConf(topicNameSet, qryEntity);
+    }
+
+    /**
+     * Add if absent topic control configure info
+     *
+     * @param opEntity  the operation info
+     * @param topicName the topic name will be add
+     * @param strBuff   the print info string buffer
+     * @param result    the process result return
+     * @return true if success otherwise false
+     */
+    private boolean addTopicCtrlConfIfAbsent(BaseEntity opEntity, String topicName,
+                                             StringBuilder strBuff, ProcessResult result) {
+        int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+        ClusterSettingEntity defSetting = getClusterDefSetting(false);
+        if (defSetting != null) {
+            maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+        }
+        TopicCtrlEntity entity = new TopicCtrlEntity(opEntity, topicName,
+                TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB);
+        return innAddOrUpdTopicCtrlConf(false, true, entity, strBuff, result);
+    }
+
+    /**
+     * Add or Update topic control configure info
+     *
+     * @param chkConsistent     whether order operation condition
+     * @param isAddOpOrOnlyAdd  the operation type,
+     * @param entity            the entity need to operation
+     * @param strBuff           the string buffer
+     * @param result            the process result return
+     * @return true if success otherwise false
+     */
+    private boolean innAddOrUpdTopicCtrlConf(boolean chkConsistent, boolean isAddOpOrOnlyAdd,
+                                             TopicCtrlEntity entity, StringBuilder strBuff,
+                                             ProcessResult result) {
+        Integer lid = null;
+        TopicCtrlEntity curEntity;
+        TopicCtrlEntity newEntity;
+        boolean addRecord = true;
+        String printPrefix = "[addTopicCtrlConf], ";
+        // execute add or update operation
+        try {
+            // lock brokerId meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(entity.getTopicName()), true);
+            // get current record and judge execute condition
+            curEntity = topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
+            if (curEntity == null) {
+                if (chkConsistent && !isAddOpOrOnlyAdd) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            strBuff.append("Not found topic control configure for topicName(")
+                                    .append(entity.getTopicName()).append(")!").toString());
+                    strBuff.delete(0, strBuff.length());
+                    return result.isSuccess();
+                }
+                topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result);
+            } else {
+                if (isAddOpOrOnlyAdd) {
+                    if (chkConsistent) {
+                        result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                                strBuff.append("Existed record found for topicName(")
+                                        .append(entity.getTopicName()).append(")!").toString());
+                        strBuff.delete(0, strBuff.length());
+                    } else {
+                        result.setSuccResult(null);
+                    }
+                    return result.isSuccess();
+                }
+                addRecord = false;
+                printPrefix = "[udpTopicCtrlConf], ";
+                topicCtrlMapper.updTopicCtrlConf(entity, strBuff, result);
+            }
+            newEntity = topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            if (addRecord) {
+                strBuff.append(printPrefix).append(entity.getCreateUser())
+                        .append(" added topic control configure: ").append(newEntity);
+            } else {
+                strBuff.append(printPrefix).append(entity.getModifyUser())
+                        .append(" updated topic control configure from ")
+                        .append(curEntity).append(" to ").append(newEntity);
+            }
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    // ///////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public boolean addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
+                                           int brokerId, String topicName,
+                                           TopicStatus deployStatus,
+                                           TopicPropGroup topicPropInfo,
+                                           StringBuilder strBuff, ProcessResult result) {
+        TopicDeployEntity entity =
+                new TopicDeployEntity(opEntity, brokerId, topicName);
+        entity.updModifyInfo(opEntity.getDataVerId(),
+                TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+                null, deployStatus, topicPropInfo);
+        return addOrUpdTopicDeployInfo(isAddOp, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean addOrUpdTopicDeployInfo(boolean isAddOp, TopicDeployEntity entity,
+                                           StringBuilder strBuff, ProcessResult result) {
+        TopicDeployEntity curEntity;
+        TopicDeployEntity newEntity;
+        String printPrefix = "[addTopicDeployConf], ";
+        Integer topicLockId = null;
+        Integer brokerLockId = null;
+        // add topic control configure
+        addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+        // execute add or update operation
+        try {
+            // lock topicName meta-lock
+            topicLockId = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(entity.getTopicName()), true);
+            try {
+                // lock brokerId meta-lock
+                brokerLockId = metaRowLock.getLock(null,
+                        StringUtils.getBytesUtf8(String.valueOf(entity.getBrokerId())), true);
+                // check broker configure exist
+                BrokerConfEntity brokerEntity =
+                        brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
+                if (brokerEntity == null) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            strBuff.append("Not found broker configure by brokerId=")
+                                    .append(entity.getBrokerId())
+                                    .append(", please create the broker's configure first!").toString());
+                    strBuff.delete(0, strBuff.length());
+                    return result.isSuccess();
+                }
+                // check topic deploy configure
+                curEntity = topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+                if (isAddOp) {
+                    if (curEntity != null) {
+                        if (curEntity.isValidTopicStatus()) {
+                            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                                    strBuff.append("Existed record found for brokerId-topicName(")
+                                            .append(curEntity.getRecordKey()).append(")!").toString());
+                        } else {
+                            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                                    strBuff.append("Softly deleted record found for brokerId-topicName(")
+                                            .append(curEntity.getRecordKey())
+                                            .append("), please resume or remove it first!").toString());
+                        }
+                        strBuff.delete(0, strBuff.length());
+                        return result.isSuccess();
+                    }
+                    // add record
+                    topicDeployMapper.addTopicDeployConf(entity, strBuff, result);
+                } else {
+                    printPrefix = "[updTopicDeployConf], ";
+                    if (curEntity == null) {
+                        result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                                strBuff.append("Not found topic deploy configure for brokerId-topicName(")
+                                        .append(entity.getRecordKey()).append(")!").toString());
+                        strBuff.delete(0, strBuff.length());
+                        return result.isSuccess();
+                    }
+                    // update record
+                    topicDeployMapper.updTopicDeployConf(entity, strBuff, result);
+                }
+                newEntity = topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+            } finally {
+                if (brokerLockId != null) {
+                    metaRowLock.releaseRowLock(brokerLockId);
+                }
+            }
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (topicLockId != null) {
+                metaRowLock.releaseRowLock(topicLockId);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            if (isAddOp) {
+                strBuff.append(printPrefix).append(entity.getCreateUser())
+                        .append(" added topic deploy configure: ").append(newEntity);
+            } else {
+                strBuff.append(printPrefix).append(entity.getModifyUser())
+                        .append(" updated topic deploy configure from ")
+                        .append(curEntity).append(" to ").append(newEntity);
+            }
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean updTopicDeployStatusInfo(BaseEntity opEntity, int brokerId,
+                                            String topicName, TopicStatus topicStatus,
+                                            StringBuilder strBuff, ProcessResult result) {
+        TopicDeployEntity curEntity;
+        TopicDeployEntity newEntity;
+        String printPrefix = "[updTopicDeployConf], ";
+        Integer topicLockId = null;
+        Integer brokerLockId = null;
+        // execute add or update operation
+        try {
+            // lock topicName meta-lock
+            topicLockId = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(topicName), true);
+            try {
+                // lock brokerId meta-lock
+                brokerLockId = metaRowLock.getLock(null,
+                        StringUtils.getBytesUtf8(String.valueOf(brokerId)), true);
+                // check broker configure exist
+                BrokerConfEntity brokerEntity =
+                        brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+                if (brokerEntity == null) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            strBuff.append("Not found broker configure by brokerId=")
+                                    .append(brokerId)
+                                    .append(", please create the broker's configure first!").toString());
+                    strBuff.delete(0, strBuff.length());
+                    return result.isSuccess();
+                }
+                // check topic deploy configure
+                curEntity = topicDeployMapper.getTopicConf(brokerId, topicName);
+                if (curEntity == null) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            strBuff.append("Not found topic deploy configure for brokerId-topicName(")
+                                    .append(brokerId).append("-").append(topicName)
+                                    .append(")!").toString());
+                    strBuff.delete(0, strBuff.length());
+                    return result.isSuccess();
+                }
+                if (curEntity.getTopicStatus() == topicStatus) {
+                    result.setSuccResult(null);
+                    return result.isSuccess();
+                }
+                printPrefix = "[updTopicDeployConf], ";
+                // update record
+                topicDeployMapper.updTopicDeployStatus(opEntity,
+                        brokerId, topicName, topicStatus, strBuff, result);
+                newEntity = topicDeployMapper.getTopicConfByeRecKey(curEntity.getRecordKey());
+            } finally {
+                if (brokerLockId != null) {
+                    metaRowLock.releaseRowLock(brokerLockId);
+                }
+            }
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (topicLockId != null) {
+                metaRowLock.releaseRowLock(topicLockId);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            strBuff.append(printPrefix).append(opEntity.getModifyUser())
+                    .append(" updated topic deploy configure from ")
+                    .append(curEntity).append(" to ").append(newEntity);
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public Map<String, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                                      Set<Integer> brokerIdSet,
+                                                                      TopicDeployEntity qryEntity) {
+        return topicDeployMapper.getTopicConfMap(topicNameSet, brokerIdSet, qryEntity);
+    }
+
+    @Override
+    public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+                                                                       Set<Integer> brokerIdSet) {
+        Map<Integer, BrokerConfEntity> qryBrokerInfoMap =
+                brokerConfigMapper.getBrokerConfInfo(brokerIdSet, null, null);
+        if (qryBrokerInfoMap.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        return topicDeployMapper.getTopicDeployInfoMap(topicNameSet, qryBrokerInfoMap.keySet());
+    }
+
+    @Override
+    public Map<String, List<TopicDeployEntity>> getTopicConfInfoByTopicAndBrokerIds(
+            Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+        return topicDeployMapper.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIdSet);
+    }
+
+    // //////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public boolean addOrUpdGroupResCtrlConf(boolean isAddOp, BaseEntity opEntity,
+                                            String groupName, Boolean resCheckEnable,
+                                            int allowedBClientRate, int qryPriorityId,
+                                            Boolean flowCtrlEnable, int flowRuleCnt,
+                                            String flowCtrlInfo, StringBuilder strBuff,
+                                            ProcessResult result) {
+        GroupResCtrlEntity entity =
+                new GroupResCtrlEntity(opEntity, groupName);
+        entity.updModifyInfo(opEntity.getDataVerId(), resCheckEnable, allowedBClientRate,
+                qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
+        return addOrUpdGroupResCtrlConf(isAddOp, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean addOrUpdGroupResCtrlConf(boolean isAddOp, GroupResCtrlEntity entity,
+                                            StringBuilder strBuff, ProcessResult result) {
+        return addOrUpdGroupCtrlConf(true,
+                isAddOp, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+                                       int qryPriorityId, Boolean flowCtrlEnable,
+                                       int flowRuleCnt, String flowCtrlRuleInfo,
+                                       StringBuilder strBuff, ProcessResult result) {
+        GroupResCtrlEntity newEntity = new GroupResCtrlEntity(opEntity, groupName);
+        newEntity.updModifyInfo(opEntity.getDataVerId(), null,
+                TBaseConstants.META_VALUE_UNDEFINED, qryPriorityId,
+                flowCtrlEnable, flowRuleCnt, flowCtrlRuleInfo);
+        return addOrUpdGroupCtrlConf(false,
+                false, newEntity, strBuff, result);
+    }
+
+    @Override
+    public boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+                                       Boolean resChkEnable, int allowedB2CRate,
+                                       StringBuilder strBuff, ProcessResult result) {
+        GroupResCtrlEntity newEntity = new GroupResCtrlEntity(opEntity, groupName);
+        newEntity.updModifyInfo(opEntity.getDataVerId(), resChkEnable, allowedB2CRate,
+                TBaseConstants.META_VALUE_UNDEFINED, null,
+                TBaseConstants.META_VALUE_UNDEFINED, null);
+        return addOrUpdGroupCtrlConf(false,
+                false, newEntity, strBuff, result);
+    }
+
+    @Override
+    public boolean insertGroupCtrlConf(GroupResCtrlEntity entity,
+                                       StringBuilder strBuff, ProcessResult result) {
+        return addOrUpdGroupCtrlConf(false, false, entity, strBuff, result);
+    }
+
+    /**
+     * Add if absent group control configure info
+     *
+     * @param opEntity  the operation info
+     * @param groupName the group name will be add
+     * @param strBuff   the print info string buffer
+     * @param result    the process result return
+     * @return true if success otherwise false
+     */
+    private boolean addGroupCtrlConfIfAbsent(BaseEntity opEntity, String groupName,
+                                             StringBuilder strBuff, ProcessResult result) {
+        GroupResCtrlEntity resCtrlEntity =
+                groupResCtrlMapper.getGroupResCtrlConf(groupName);
+        if (resCtrlEntity != null) {
+            result.setSuccResult(null);
+            return true;
+        }
+        resCtrlEntity = new GroupResCtrlEntity(opEntity, groupName);
+        resCtrlEntity.fillDefaultValue();
+        return addOrUpdGroupCtrlConf(false, true, resCtrlEntity, strBuff, result);
+    }
+
+    /**
+     * Add or Update group control configure info
+     *
+     * @param chkConsistent     whether order operation condition
+     * @param isAddOpOrOnlyAdd  the operation type,
+     * @param entity            the entity need to operation
+     * @param strBuff           the string buffer
+     * @param result            the process result return
+     * @return true if success otherwise false
+     */
+    private boolean addOrUpdGroupCtrlConf(boolean chkConsistent, boolean isAddOpOrOnlyAdd,
+                                          GroupResCtrlEntity entity, StringBuilder strBuff,
+                                          ProcessResult result) {
+        Integer lid = null;
+        boolean addRecord = true;
+        GroupResCtrlEntity curEntity;
+        GroupResCtrlEntity newEntity;
+        String printPrefix = "[addGroupCtrlConf], ";
+        // execute add or update operation
+        try {
+            // lock group name meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(entity.getGroupName()), true);
+            // get current record and judge execute condition
+            curEntity = groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
+            if (curEntity == null) {
+                if (chkConsistent && !isAddOpOrOnlyAdd) {
+                    result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                            strBuff.append("Not found group control configure for groupName(")
+                                    .append(entity.getGroupName()).append(")!").toString());
+                    strBuff.delete(0, strBuff.length());
+                    return result.isSuccess();
+                }
+                groupResCtrlMapper.addGroupResCtrlConf(entity, strBuff, result);
+            } else {
+                if (isAddOpOrOnlyAdd) {
+                    if (chkConsistent) {
+                        result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                                strBuff.append("Existed record found for groupName(")
+                                        .append(entity.getGroupName()).append(")!").toString());
+                        strBuff.delete(0, strBuff.length());
+                    } else {
+                        result.setSuccResult(null);
+                    }
+                    return result.isSuccess();
+                }
+                addRecord = false;
+                printPrefix = "[updGroupCtrlConf], ";
+                groupResCtrlMapper.updGroupResCtrlConf(entity, strBuff, result);
+            }
+            newEntity = groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            if (addRecord) {
+                strBuff.append(printPrefix).append(entity.getCreateUser())
+                        .append(" added group control configure: ").append(newEntity);
+            } else {
+                strBuff.append(printPrefix).append(entity.getModifyUser())
+                        .append(" updated group control configure from ")
+                        .append(curEntity).append(" to ").append(newEntity);
+            }
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delGroupCtrlConf(String operator, String groupName,
+                                    StringBuilder strBuff, ProcessResult result) {
+        Integer lid = null;
+        GroupResCtrlEntity curEntity;
+        String printPrefix = "[delGroupCtrlConf], ";
+        // execute add or update operation
+        try {
+            // lock group name meta-lock
+            lid = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(groupName), true);
+            // get current record and judge execute condition
+            curEntity = groupResCtrlMapper.getGroupResCtrlConf(groupName);
+            if (curEntity == null) {
+                result.setFailResult(null);
+                return result.isSuccess();
+            }
+            if (consumeCtrlMapper.isGroupNameInUse(groupName)) {
+                result.setFailResult(DataOpErrCode.DERR_CONDITION_LACK.getCode(),
+                        strBuff.append("Group ").append(groupName)
+                                .append(" has consume control configures,")
+                                .append(", please delete consume control configures first!")
+                                .toString());
+                strBuff.delete(0, strBuff.length());
+                return result.isSuccess();
+            }
+            groupResCtrlMapper.delGroupResCtrlConf(groupName, strBuff, result);
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (lid != null) {
+                metaRowLock.releaseRowLock(lid);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            strBuff.append(printPrefix).append(operator)
+                    .append(" deleted group control configure: ").append(curEntity);
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public Map<String, GroupResCtrlEntity> getGroupCtrlConf(Set<String> groupSet,
+                                                            GroupResCtrlEntity qryEntity) {
+        return groupResCtrlMapper.getGroupResCtrlConf(groupSet, qryEntity);
+    }
+
+    @Override
+    public GroupResCtrlEntity getGroupCtrlConf(String groupName) {
+        return groupResCtrlMapper.getGroupResCtrlConf(groupName);
+    }
+
+    // //////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, BaseEntity opEntity,
+                                           String groupName, String topicName,
+                                           Boolean enableCsm, String disableRsn,
+                                           Boolean enableFlt, String fltCondStr,
+                                           StringBuilder strBuff, ProcessResult result) {
+        GroupConsumeCtrlEntity entity =
+                new GroupConsumeCtrlEntity(opEntity, groupName, topicName);
+        entity.updModifyInfo(opEntity.getDataVerId(),
+                enableCsm, disableRsn, enableFlt, fltCondStr);
+        return addOrUpdConsumeCtrlConf(true, isAddOp, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, GroupConsumeCtrlEntity entity,
+                                           StringBuilder strBuff, ProcessResult result) {
+        return addOrUpdConsumeCtrlConf(true, isAddOp, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean insertConsumeCtrlInfo(BaseEntity opEntity, String groupName,
+                                         String topicName, Boolean enableCsm,
+                                         String disReason, Boolean enableFlt,
+                                         String fltCondStr, StringBuilder strBuff,
+                                         ProcessResult result) {
+        GroupConsumeCtrlEntity entity =
+                new GroupConsumeCtrlEntity(opEntity, groupName, topicName);
+        entity.updModifyInfo(opEntity.getDataVerId(),
+                enableCsm, disReason, enableFlt, fltCondStr);
+        return addOrUpdConsumeCtrlConf(false, false, entity, strBuff, result);
+    }
+
+    @Override
+    public boolean insertConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
+                                         StringBuilder strBuff, ProcessResult result) {
+        return addOrUpdConsumeCtrlConf(false, false, entity, strBuff, result);
+    }
+
+    /**
+     * Add or Update consume control configure info
+     *
+     * @param chkConsistent     whether order operation condition
+     * @param isAddOpOrOnlyAdd  the operation type,
+     * @param entity            the entity need to operation
+     * @param strBuff           the string buffer
+     * @param result            the process result return
+     * @return true if success otherwise false
+     */
+    private boolean addOrUpdConsumeCtrlConf(boolean chkConsistent, boolean isAddOpOrOnlyAdd,
+                                            GroupConsumeCtrlEntity entity, StringBuilder strBuff,
+                                            ProcessResult result) {
+        boolean addRecord = true;
+        Integer topicLockId = null;
+        Integer groupLockId = null;
+        GroupConsumeCtrlEntity curEntity;
+        GroupConsumeCtrlEntity newEntity;
+        String printPrefix = "[addConsumeCtrlConf], ";
+        // append topic control configure
+        addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+        // append group control configure
+        addGroupCtrlConfIfAbsent(entity, entity.getGroupName(), strBuff, result);
+        // execute add or update operation
+        try {
+            // lock topicName meta-lock
+            topicLockId = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(entity.getTopicName()), true);
+            try {
+                // lock groupName meta-lock
+                groupLockId = metaRowLock.getLock(null,
+                        StringUtils.getBytesUtf8(entity.getGroupName()), true);
+                // check consume control configure exist
+                curEntity = consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
+                if (curEntity == null) {
+                    if (chkConsistent && !isAddOpOrOnlyAdd) {
+                        result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                                strBuff.append("Not found consume control for groupName-topicName(")
+                                        .append(entity.getRecordKey()).append(")!").toString());
+                        strBuff.delete(0, strBuff.length());
+                        return result.isSuccess();
+                    }
+                    consumeCtrlMapper.addGroupConsumeCtrlConf(entity, strBuff, result);
+                } else {
+                    if (isAddOpOrOnlyAdd) {
+                        if (chkConsistent) {
+                            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                                    strBuff.append("Existed record found for groupName-topicName(")
+                                            .append(entity.getRecordKey()).append(")!").toString());
+                            strBuff.delete(0, strBuff.length());
+                        } else {
+                            result.setSuccResult(null);
+                        }
+                        return result.isSuccess();
+                    }
+                    addRecord = false;
+                    printPrefix = "[updConsumeCtrlConf], ";
+                    consumeCtrlMapper.updGroupConsumeCtrlConf(entity, strBuff, result);
+                }
+                newEntity = consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
+            } finally {
+                if (groupLockId != null) {
+                    metaRowLock.releaseRowLock(groupLockId);
+                }
+            }
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (topicLockId != null) {
+                metaRowLock.releaseRowLock(topicLockId);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            if (addRecord) {
+                strBuff.append(printPrefix).append(entity.getCreateUser())
+                        .append(" added consume control configure: ").append(newEntity);
+            } else {
+                strBuff.append(printPrefix).append(entity.getModifyUser())
+                        .append(" updated consume control configure from ")
+                        .append(curEntity).append(" to ").append(newEntity);
+            }
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delConsumeCtrlConf(String operator,
+                                      String groupName, String topicName,
+                                      StringBuilder strBuff, ProcessResult result) {
+        Integer topicLockId = null;
+        Integer groupLockId = null;
+        GroupConsumeCtrlEntity curEntity;
+        String printPrefix = "[delConsumeCtrlConf], ";
+        // execute delete operation
+        try {
+            // lock topicName meta-lock
+            topicLockId = metaRowLock.getLock(null,
+                    StringUtils.getBytesUtf8(topicName), true);
+            try {
+                // lock groupName meta-lock
+                groupLockId = metaRowLock.getLock(null,
+                        StringUtils.getBytesUtf8(groupName), true);
+                // check consume control configure exist
+                curEntity = consumeCtrlMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+                if (curEntity == null) {
+                    result.setSuccResult(null);
+                    return result.isSuccess();
+                }
+                consumeCtrlMapper.delGroupConsumeCtrlConf(groupName, topicName, strBuff, result);
+            } finally {
+                if (groupLockId != null) {
+                    metaRowLock.releaseRowLock(groupLockId);
+                }
+            }
+        } catch (Throwable e) {
+            return logExceptionInfo(e, printPrefix, strBuff, result);
+        } finally {
+            if (topicLockId != null) {
+                metaRowLock.releaseRowLock(topicLockId);
+            }
+        }
+        // print log to file
+        if (result.isSuccess()) {
+            strBuff.append(printPrefix).append(operator)
+                    .append(" deleted consume control configure: ").append(curEntity);
+            logger.info(strBuff.toString());
+            strBuff.delete(0, strBuff.length());
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName) {
+        return consumeCtrlMapper.getConsumeCtrlByTopicName(topicName);
+    }
+
+    @Override
+    public Set<String> getDisableTopicByGroupName(String groupName) {
+        Set<String> disTopicSet = new HashSet<>();
+        List<GroupConsumeCtrlEntity> qryResult =
+                consumeCtrlMapper.getConsumeCtrlByGroupName(groupName);
+        if (qryResult.isEmpty()) {
+            return disTopicSet;
+        }
+        for (GroupConsumeCtrlEntity ctrlEntity : qryResult) {
+            if (ctrlEntity != null && !ctrlEntity.isEnableConsume()) {
+                disTopicSet.add(ctrlEntity.getTopicName());
+            }
+        }
+        return disTopicSet;
+    }
+
+    @Override
+    public Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
+            Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
+        return consumeCtrlMapper.getConsumeCtrlInfoMap(groupSet, topicSet, qryEntry);
+    }
+
+    /**
+     * Initial meta-data stores.
+     *
+     */
+    protected void initMetaStore() {
+
+    }
+
+    /**
+     * Reload meta-data stores.
+     *
+     * @param strBuff  the string buffer
+     */
+    private void reloadMetaStore(StringBuilder strBuff) {
+        // Clear observers' cache data.
+        for (AliveObserver observer : eventObservers) {
+            observer.clearCacheData();
+        }
+        // Load the latest meta-data from persistent
+        clusterConfigMapper.loadConfig(strBuff);
+        brokerConfigMapper.loadConfig(strBuff);
+        topicDeployMapper.loadConfig(strBuff);
+        topicCtrlMapper.loadConfig(strBuff);
+        groupResCtrlMapper.loadConfig(strBuff);
+        consumeCtrlMapper.loadConfig(strBuff);
+        // load the latest meta-data to observers
+        for (AliveObserver observer : eventObservers) {
+            observer.reloadCacheData();
+        }
+    }
+
+    /**
+     * Close meta-data stores.
+     *
+     */
+    private void closeMetaStore() {
+        brokerConfigMapper.close();
+        topicDeployMapper.close();
+        groupResCtrlMapper.close();
+        topicCtrlMapper.close();
+        consumeCtrlMapper.close();
+        clusterConfigMapper.close();
+    }
+
+    private boolean logExceptionInfo(Throwable e, String printPrefix,
+                                     StringBuilder strBuff, ProcessResult result) {
+        strBuff.delete(0, strBuff.length());
+        strBuff.append(printPrefix).append("failed to lock meta-lock.");
+        logger.warn(strBuff.toString(), e);
+        result.setFailResult(DataOpErrCode.DERR_STORE_LOCK_FAILURE.getCode(),
+                strBuff.toString());
+        strBuff.delete(0, strBuff.length());
+        return result.isSuccess();
+    }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
index 9504ba1..f9805c7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
@@ -86,7 +86,7 @@ public abstract class AbsTopicCtrlMapperImpl implements TopicCtrlMapper {
         // Store data to persistent
         if (putConfig2Persistent(newEntity, strBuff, result)) {
             topicCtrlCache.put(newEntity.getTopicName(), newEntity);
-            result.setSuccResult(curEntity);
+            result.setSuccResult(null);
         }
         return result.isSuccess();
     }
@@ -103,7 +103,7 @@ public abstract class AbsTopicCtrlMapperImpl implements TopicCtrlMapper {
         }
         delConfigFromPersistent(topicName, strBuff);
         topicCtrlCache.remove(topicName);
-        result.setSuccResult(curEntity);
+        result.setSuccResult(null);
         return result.isSuccess();
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index 0951c16..d34a4f3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -32,6 +32,7 @@ import org.apache.inlong.tubemq.corebase.utils.KeyBuilderUtils;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
 import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
 import org.slf4j.Logger;
@@ -42,24 +43,24 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
             LoggerFactory.getLogger(AbsTopicDeployMapperImpl.class);
     // data cache
     private final ConcurrentHashMap<String/* recordKey */, TopicDeployEntity>
-            topicConfCache = new ConcurrentHashMap<>();
+            topicDeployCache = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
-            brokerIdCacheIndex = new ConcurrentHashMap<>();
+            brokerId2RecordCache = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
-            topicNameCacheIndex = new ConcurrentHashMap<>();
+            topicName2RecordCache = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
-            brokerId2TopicCacheIndex = new ConcurrentHashMap<>();
+            brokerId2TopicNameCache = new ConcurrentHashMap<>();
 
     public AbsTopicDeployMapperImpl() {
         // Initial instant
     }
 
     @Override
-    public boolean addTopicConf(TopicDeployEntity entity,
-                                StringBuilder strBuff, ProcessResult result) {
+    public boolean addTopicDeployConf(TopicDeployEntity entity,
+                                      StringBuilder strBuff, ProcessResult result) {
         // Checks whether the record already exists
         TopicDeployEntity curEntity =
-                topicConfCache.get(entity.getRecordKey());
+                topicDeployCache.get(entity.getRecordKey());
         if (curEntity != null) {
             if (curEntity.isValidTopicStatus()) {
                 result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
@@ -75,7 +76,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
             return result.isSuccess();
         }
         // valid whether system topic
-        if (!validSysTopicConfigure(entity, strBuff, result)) {
+        if (!isValidSysTopicConf(entity, strBuff, result)) {
             return result.isSuccess();
         }
         // check deploy status if still accept publish and subscribe
@@ -96,11 +97,11 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
     }
 
     @Override
-    public boolean updTopicConf(TopicDeployEntity entity,
-                                StringBuilder strBuff, ProcessResult result) {
+    public boolean updTopicDeployConf(TopicDeployEntity entity,
+                                      StringBuilder strBuff, ProcessResult result) {
         // Checks whether the record already exists
         TopicDeployEntity curEntity =
-                topicConfCache.get(entity.getRecordKey());
+                topicDeployCache.get(entity.getRecordKey());
         if (curEntity == null) {
             result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
                     strBuff.append("Not found topic deploy configure for brokerId-topicName(")
@@ -120,25 +121,61 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
             return result.isSuccess();
         }
         // valid whether system topic
-        if (!validSysTopicConfigure(newEntity, strBuff, result)) {
+        if (!isValidSysTopicConf(newEntity, strBuff, result)) {
             return result.isSuccess();
         }
         // check deploy status
-        if (isIllegalValuesChange(newEntity, curEntity, strBuff, result)) {
+        if (!isValidValuesChange(newEntity, curEntity, strBuff, result)) {
             return result.isSuccess();
         }
         // Store data to persistent
         if (putConfig2Persistent(newEntity, strBuff, result)) {
             putRecord2Caches(newEntity);
-            result.setSuccResult(curEntity);
+            result.setSuccResult(null);
         }
         return result.isSuccess();
     }
 
     @Override
-    public boolean delTopicConf(String recordKey, StringBuilder strBuff, ProcessResult result) {
+    public boolean updTopicDeployStatus(BaseEntity opEntity, int brokerId,
+                                        String topicName, TopicStatus topicStatus,
+                                        StringBuilder strBuff, ProcessResult result) {
+        // Checks whether the record already exists
+        TopicDeployEntity curEntity = getTopicConf(brokerId, topicName);
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    strBuff.append("Not found topic deploy configure for brokerId-topicName(")
+                            .append(brokerId).append("-").append(topicName)
+                            .append(")!").toString());
+            strBuff.delete(0, strBuff.length());
+            return result.isSuccess();
+        }
+        // Build the entity that need to be updated
+        TopicDeployEntity newEntity = curEntity.clone();
+        newEntity.updBaseModifyInfo(opEntity);
+        if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
+                TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+                null, topicStatus, null)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    "Topic deploy configure not changed!");
+            return result.isSuccess();
+        }
+        // check deploy status
+        if (!isValidValuesChange(newEntity, curEntity, strBuff, result)) {
+            return result.isSuccess();
+        }
+        // Store data to persistent
+        if (putConfig2Persistent(newEntity, strBuff, result)) {
+            putRecord2Caches(newEntity);
+            result.setSuccResult(null);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delTopicDeployConf(String recordKey, StringBuilder strBuff, ProcessResult result) {
         TopicDeployEntity curEntity =
-                topicConfCache.get(recordKey);
+                topicDeployCache.get(recordKey);
         if (curEntity == null) {
             result.setSuccResult(null);
             return result.isSuccess();
@@ -154,14 +191,14 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         }
         delConfigFromPersistent(recordKey, strBuff);
         delRecordFromCaches(recordKey);
-        result.setSuccResult(curEntity);
+        result.setSuccResult(null);
         return result.isSuccess();
     }
 
     @Override
     public boolean delTopicConfByBrokerId(Integer brokerId, StringBuilder strBuff, ProcessResult result) {
         ConcurrentHashSet<String> recordKeySet =
-                brokerIdCacheIndex.get(brokerId);
+                brokerId2RecordCache.get(brokerId);
         if (recordKeySet == null || recordKeySet.isEmpty()) {
             result.setSuccResult(null);
             return result.isSuccess();
@@ -169,7 +206,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         // check deploy status if still accept publish and subscribe
         TopicDeployEntity curEntity;
         for (String recordKey : recordKeySet) {
-            curEntity = topicConfCache.get(recordKey);
+            curEntity = topicDeployCache.get(recordKey);
             if (curEntity == null) {
                 continue;
             }
@@ -194,13 +231,13 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
     @Override
     public boolean hasConfiguredTopics(int brokerId) {
         ConcurrentHashSet<String> keySet =
-                brokerIdCacheIndex.get(brokerId);
+                brokerId2RecordCache.get(brokerId);
         return (keySet != null && !keySet.isEmpty());
     }
 
     @Override
     public boolean isTopicDeployed(String topicName) {
-        ConcurrentHashSet<String> deploySet = topicNameCacheIndex.get(topicName);
+        ConcurrentHashSet<String> deploySet = topicName2RecordCache.get(topicName);
         return (deploySet != null && !deploySet.isEmpty());
     }
 
@@ -208,9 +245,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
     public List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity) {
         List<TopicDeployEntity> retEntities = new ArrayList<>();
         if (qryEntity == null) {
-            retEntities.addAll(topicConfCache.values());
+            retEntities.addAll(topicDeployCache.values());
         } else {
-            for (TopicDeployEntity entity : topicConfCache.values()) {
+            for (TopicDeployEntity entity : topicDeployCache.values()) {
                 if (entity != null && entity.isMatched(qryEntity)) {
                     retEntities.add(entity);
                 }
@@ -223,12 +260,12 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
     public TopicDeployEntity getTopicConf(int brokerId, String topicName) {
         String recordKey =
                 KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
-        return topicConfCache.get(recordKey);
+        return topicDeployCache.get(recordKey);
     }
 
     @Override
     public TopicDeployEntity getTopicConfByeRecKey(String recordKey) {
-        return topicConfCache.get(recordKey);
+        return topicDeployCache.get(recordKey);
     }
 
     @Override
@@ -241,7 +278,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
         // filter record by qryEntity
         if (matchedKeySet == null) {
-            for (TopicDeployEntity entry :  topicConfCache.values()) {
+            for (TopicDeployEntity entry :  topicDeployCache.values()) {
                 if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
                     continue;
                 }
@@ -252,7 +289,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         } else {
             TopicDeployEntity entry;
             for (String recKey : matchedKeySet) {
-                entry = topicConfCache.get(recKey);
+                entry = topicDeployCache.get(recKey);
                 if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
                     continue;
                 }
@@ -278,10 +315,10 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
         // get record by keys
         if (matchedKeySet == null) {
-            matchedKeySet = new HashSet<>(topicConfCache.keySet());
+            matchedKeySet = new HashSet<>(topicDeployCache.keySet());
         }
         for (String recordKey: matchedKeySet) {
-            TopicDeployEntity entity = topicConfCache.get(recordKey);
+            TopicDeployEntity entity = topicDeployCache.get(recordKey);
             if (entity == null) {
                 continue;
             }
@@ -302,7 +339,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         Set<String> matchedKeySet = getMatchedRecords(topicSet, brokerIdSet);
         // get records by matched keys
         if (matchedKeySet == null) {
-            for (TopicDeployEntity entity : topicConfCache.values()) {
+            for (TopicDeployEntity entity : topicDeployCache.values()) {
                 if (entity == null) {
                     continue;
                 }
@@ -312,7 +349,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
             }
         } else {
             for (String key : matchedKeySet) {
-                tmpEntity = topicConfCache.get(key);
+                tmpEntity = topicDeployCache.get(key);
                 if (tmpEntity == null) {
                     continue;
                 }
@@ -328,12 +365,12 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
     public Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId) {
         TopicDeployEntity tmpEntity;
         Map<String, TopicDeployEntity> retEntityMap = new HashMap<>();
-        ConcurrentHashSet<String> records = brokerIdCacheIndex.get(brokerId);
+        ConcurrentHashSet<String> records = brokerId2RecordCache.get(brokerId);
         if (records == null || records.isEmpty()) {
             return retEntityMap;
         }
         for (String key : records) {
-            tmpEntity = topicConfCache.get(key);
+            tmpEntity = topicDeployCache.get(key);
             if (tmpEntity == null) {
                 continue;
             }
@@ -349,7 +386,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         Map<Integer, Set<String>> retEntityMap = new HashMap<>();
         if (brokerIdSet == null || brokerIdSet.isEmpty()) {
             for (Map.Entry<Integer, ConcurrentHashSet<String>> entry
-                    : brokerId2TopicCacheIndex.entrySet()) {
+                    : brokerId2TopicNameCache.entrySet()) {
                 if (entry.getKey() == null) {
                     continue;
                 }
@@ -365,7 +402,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                     continue;
                 }
                 topicSet = new HashSet<>();
-                deploySet = brokerId2TopicCacheIndex.get(brokerId);
+                deploySet = brokerId2TopicNameCache.get(brokerId);
                 if (deploySet != null) {
                     topicSet.addAll(deploySet);
                 }
@@ -381,7 +418,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         Map<Integer, String> brokerInfoMap;
         Map<String, Map<Integer, String>> retEntityMap = new HashMap<>();
         if (topicNameSet == null || topicNameSet.isEmpty()) {
-            for (TopicDeployEntity entry : topicConfCache.values()) {
+            for (TopicDeployEntity entry : topicDeployCache.values()) {
                 if (entry == null) {
                     continue;
                 }
@@ -395,10 +432,10 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                     continue;
                 }
                 brokerInfoMap = retEntityMap.computeIfAbsent(topicName, k -> new HashMap<>());
-                keySet = topicNameCacheIndex.get(topicName);
+                keySet = topicName2RecordCache.get(topicName);
                 if (keySet != null) {
                     for (String key : keySet) {
-                        TopicDeployEntity entry = topicConfCache.get(key);
+                        TopicDeployEntity entry = topicDeployCache.get(key);
                         if (entry != null) {
                             brokerInfoMap.put(entry.getBrokerId(), entry.getBrokerIp());
                         }
@@ -411,17 +448,17 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
 
     @Override
     public Set<String> getConfiguredTopicSet() {
-        return new HashSet<>(topicNameCacheIndex.keySet());
+        return new HashSet<>(topicName2RecordCache.keySet());
     }
 
     /**
      * Clear cached data
      */
     protected void clearCachedData() {
-        topicNameCacheIndex.clear();
-        brokerIdCacheIndex.clear();
-        brokerId2TopicCacheIndex.clear();
-        topicConfCache.clear();
+        topicName2RecordCache.clear();
+        brokerId2RecordCache.clear();
+        brokerId2TopicNameCache.clear();
+        topicDeployCache.clear();
     }
 
     /**
@@ -430,33 +467,33 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
      * @param entity  need added or updated entity
      */
     protected void putRecord2Caches(TopicDeployEntity entity) {
-        topicConfCache.put(entity.getRecordKey(), entity);
+        topicDeployCache.put(entity.getRecordKey(), entity);
         // add topic index map
         ConcurrentHashSet<String> keySet =
-                topicNameCacheIndex.get(entity.getTopicName());
+                topicName2RecordCache.get(entity.getTopicName());
         if (keySet == null) {
             ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
-            keySet = topicNameCacheIndex.putIfAbsent(entity.getTopicName(), tmpSet);
+            keySet = topicName2RecordCache.putIfAbsent(entity.getTopicName(), tmpSet);
             if (keySet == null) {
                 keySet = tmpSet;
             }
         }
         keySet.add(entity.getRecordKey());
         // add brokerId index map
-        keySet = brokerIdCacheIndex.get(entity.getBrokerId());
+        keySet = brokerId2RecordCache.get(entity.getBrokerId());
         if (keySet == null) {
             ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
-            keySet = brokerIdCacheIndex.putIfAbsent(entity.getBrokerId(), tmpSet);
+            keySet = brokerId2RecordCache.putIfAbsent(entity.getBrokerId(), tmpSet);
             if (keySet == null) {
                 keySet = tmpSet;
             }
         }
         keySet.add(entity.getRecordKey());
         // add brokerId topic map
-        keySet = brokerId2TopicCacheIndex.get(entity.getBrokerId());
+        keySet = brokerId2TopicNameCache.get(entity.getBrokerId());
         if (keySet == null) {
             ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
-            keySet = brokerId2TopicCacheIndex.putIfAbsent(entity.getBrokerId(), tmpSet);
+            keySet = brokerId2TopicNameCache.putIfAbsent(entity.getBrokerId(), tmpSet);
             if (keySet == null) {
                 keySet = tmpSet;
             }
@@ -486,33 +523,33 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
 
     private void delRecordFromCaches(String recordKey) {
         TopicDeployEntity curEntity =
-                topicConfCache.remove(recordKey);
+                topicDeployCache.remove(recordKey);
         if (curEntity == null) {
             return;
         }
         // add topic index
         ConcurrentHashSet<String> keySet =
-                topicNameCacheIndex.get(curEntity.getTopicName());
+                topicName2RecordCache.get(curEntity.getTopicName());
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                topicNameCacheIndex.remove(curEntity.getTopicName());
+                topicName2RecordCache.remove(curEntity.getTopicName());
             }
         }
         // delete brokerId index
-        keySet = brokerIdCacheIndex.get(curEntity.getBrokerId());
+        keySet = brokerId2RecordCache.get(curEntity.getBrokerId());
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                brokerIdCacheIndex.remove(curEntity.getBrokerId());
+                brokerId2RecordCache.remove(curEntity.getBrokerId());
             }
         }
         // delete broker topic map
-        keySet = brokerId2TopicCacheIndex.get(curEntity.getBrokerId());
+        keySet = brokerId2TopicNameCache.get(curEntity.getBrokerId());
         if (keySet != null) {
             keySet.remove(curEntity.getTopicName());
             if (keySet.isEmpty()) {
-                brokerId2TopicCacheIndex.remove(curEntity.getBrokerId());
+                brokerId2TopicNameCache.remove(curEntity.getBrokerId());
             }
         }
     }
@@ -527,7 +564,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         if (topicNameSet != null && !topicNameSet.isEmpty()) {
             topicKeySet = new HashSet<>();
             for (String topicName : topicNameSet) {
-                keySet = topicNameCacheIndex.get(topicName);
+                keySet = topicName2RecordCache.get(topicName);
                 if (keySet != null && !keySet.isEmpty()) {
                     topicKeySet.addAll(keySet);
                 }
@@ -540,7 +577,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
             brokerKeySet = new HashSet<>();
             for (Integer brokerId : brokerIdSet) {
-                keySet = brokerIdCacheIndex.get(brokerId);
+                keySet = brokerId2RecordCache.get(brokerId);
                 if (keySet != null && !keySet.isEmpty()) {
                     brokerKeySet.addAll(keySet);
                 }
@@ -570,19 +607,19 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
     }
 
     /**
-     * Check whether the change of deploy values is illegal
+     * Check whether the change of deploy values is valid
      * Attention, the newEntity and newEntity must not equal
      *
      * @param newEntity  the entity to be updated
      * @param curEntity  the current entity
      * @param strBuff    string buffer
      * @param result     check result of parameter value
-     * @return  true for illegal, false for legal
+     * @return  true for valid, false for invalid
      */
-    private boolean isIllegalValuesChange(TopicDeployEntity newEntity,
-                                          TopicDeployEntity curEntity,
-                                          StringBuilder strBuff,
-                                          ProcessResult result) {
+    private boolean isValidValuesChange(TopicDeployEntity newEntity,
+                                        TopicDeployEntity curEntity,
+                                        StringBuilder strBuff,
+                                        ProcessResult result) {
         // check if shrink data store block
         if (newEntity.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
                 && newEntity.getNumPartitions() < curEntity.getNumPartitions()) {
@@ -593,7 +630,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                             .append("in brokerId-topicName(").append(curEntity.getRecordKey())
                             .append(") record!").toString());
             strBuff.delete(0, strBuff.length());
-            return !result.isSuccess();
+            return result.isSuccess();
         }
         if (newEntity.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
                 && newEntity.getNumTopicStores() < curEntity.getNumTopicStores()) {
@@ -604,7 +641,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                             .append("in brokerId-topicName(").append(curEntity.getRecordKey())
                             .append(") record!").toString());
             strBuff.delete(0, strBuff.length());
-            return !result.isSuccess();
+            return result.isSuccess();
         }
         // check whether the deploy status is equal
         if (newEntity.getTopicStatus() == curEntity.getTopicStatus()) {
@@ -614,9 +651,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                                 .append(" please resume or hard remove for brokerId-topicName(")
                                 .append(newEntity.getRecordKey()).append(") record!").toString());
                 strBuff.delete(0, strBuff.length());
-                return !result.isSuccess();
+                return result.isSuccess();
             }
-            return false;
+            return true;
         }
         // check deploy status case from valid to invalid
         if (curEntity.isValidTopicStatus() && !newEntity.isValidTopicStatus()) {
@@ -627,7 +664,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                                 .append(" before change status of brokerId-topicName(")
                                 .append(curEntity.getRecordKey()).append(") record!").toString());
                 strBuff.delete(0, strBuff.length());
-                return !result.isSuccess();
+                return result.isSuccess();
             }
             if (newEntity.getTopicStatus().getCode()
                     > TopicStatus.STATUS_TOPIC_SOFT_DELETE.getCode()) {
@@ -635,9 +672,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                         strBuff.append("Please softly deleted the brokerId-topicName(")
                                 .append(newEntity.getRecordKey()).append(") record first!").toString());
                 strBuff.delete(0, strBuff.length());
-                return !result.isSuccess();
+                return result.isSuccess();
             }
-            return false;
+            return true;
         }
         // check deploy status case from invalid to invalid
         if (!curEntity.isValidTopicStatus() && !newEntity.isValidTopicStatus()) {
@@ -652,7 +689,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                                 .append(" for the brokerId-topicName(")
                                 .append(newEntity.getRecordKey()).append(") record!").toString());
                 strBuff.delete(0, strBuff.length());
-                return !result.isSuccess();
+                return result.isSuccess();
             }
             if (newEntity.isAcceptPublish()
                     || newEntity.isAcceptSubscribe()) {
@@ -661,9 +698,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
                                 .append(" before change status of brokerId-topicName(")
                                 .append(newEntity.getRecordKey()).append(") record!").toString());
                 strBuff.delete(0, strBuff.length());
-                return !result.isSuccess();
+                return result.isSuccess();
             }
-            return false;
+            return true;
         }
         // check deploy status case from invalid to valid
         if (!curEntity.isValidTopicStatus() && newEntity.isValidTopicStatus()) {
@@ -697,10 +734,10 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
      * @param deployEntity   the topic configuration that needs to be added or updated
      * @param strBuff  the print info string buffer
      * @param result   the process result return
-     * @return true if success otherwise false
+     * @return true if valid otherwise false
      */
-    private boolean validSysTopicConfigure(TopicDeployEntity deployEntity,
-                                           StringBuilder strBuff, ProcessResult result) {
+    private boolean isValidSysTopicConf(TopicDeployEntity deployEntity,
+                                        StringBuilder strBuff, ProcessResult result) {
         if (!TServerConstants.OFFSET_HISTORY_NAME.equals(deployEntity.getTopicName())) {
             return true;
         }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
index b2917b7..aa4e13d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
@@ -211,46 +211,20 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
 
     // cluster default configure api
     @Override
-    public boolean addClusterConfig(ClusterSettingEntity entity,
-                                    StringBuilder strBuff, ProcessResult result) {
-        // check current status
-        if (!checkStoreStatus(true, result)) {
-            return result.isSuccess();
-        }
-        if (clusterConfigMapper.addClusterConfig(entity, strBuff, result)) {
-            strBuff.append("[addClusterConfig], ").append(entity.getCreateUser())
-                    .append(" added cluster setting record :").append(entity);
-            logger.info(strBuff.toString());
-        } else {
-            strBuff.append("[addClusterConfig], ")
-                    .append("failure to add cluster setting record : ")
-                    .append(result.getErrMsg());
-            logger.warn(strBuff.toString());
-        }
-        strBuff.delete(0, strBuff.length());
-        return result.isSuccess();
-    }
-
-    @Override
-    public boolean updClusterConfig(ClusterSettingEntity entity,
-                                    StringBuilder strBuff, ProcessResult result) {
+    public boolean addUpdClusterConfig(ClusterSettingEntity entity,
+                                       StringBuilder strBuff, ProcessResult result) {
         // check current status
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        if (clusterConfigMapper.updClusterConfig(entity, strBuff, result)) {
-            ClusterSettingEntity oldEntity =
-                    (ClusterSettingEntity) result.getRetData();
-            ClusterSettingEntity curEntity =
-                    clusterConfigMapper.getClusterConfig();
-            strBuff.append("[updClusterConfig], ")
-                    .append(entity.getModifyUser())
-                    .append(" updated record from :").append(oldEntity.toString())
-                    .append(" to ").append(curEntity.toString());
+        if (clusterConfigMapper.addUpdClusterConfig(entity, strBuff, result)) {
+            ClusterSettingEntity newEntity = clusterConfigMapper.getClusterConfig();
+            strBuff.append("[insertClusterConfig], ").append(entity.getCreateUser())
+                    .append(" insert cluster setting record :").append(newEntity);
             logger.info(strBuff.toString());
         } else {
-            strBuff.append("[updClusterConfig], ")
-                    .append("failure to update record : ")
+            strBuff.append("[addUpdClusterConfig], ")
+                    .append("failure to insert cluster setting record : ")
                     .append(result.getErrMsg());
             logger.warn(strBuff.toString());
         }
@@ -316,16 +290,15 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
+        BrokerConfEntity curEntity =
+                brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
         if (brokerConfigMapper.updBrokerConf(entity, strBuff, result)) {
-            BrokerConfEntity oldEntity =
-                    (BrokerConfEntity) result.getRetData();
-            BrokerConfEntity curEntity =
+            BrokerConfEntity newEntity =
                     brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
             strBuff.append("[updBrokerConf], ")
                     .append(entity.getModifyUser())
                     .append(" updated broker configure record from :")
-                    .append(oldEntity.toString())
-                    .append(" to ").append(curEntity.toString());
+                    .append(curEntity).append(" to ").append(newEntity);
             logger.info(strBuff.toString());
         } else {
             strBuff.append("[updBrokerConf], ")
@@ -391,7 +364,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        if (topicDeployMapper.addTopicConf(entity, strBuff, result)) {
+        if (topicDeployMapper.addTopicDeployConf(entity, strBuff, result)) {
             strBuff.append("[addTopicConf], ").append(entity.getCreateUser())
                     .append(" added topic configure record :").append(entity);
             logger.info(strBuff.toString());
@@ -412,16 +385,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        if (topicDeployMapper.updTopicConf(entity, strBuff, result)) {
-            TopicDeployEntity oldEntity =
-                    (TopicDeployEntity) result.getRetData();
-            TopicDeployEntity curEntity =
+        TopicDeployEntity curEntity =
+                topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+        if (topicDeployMapper.updTopicDeployConf(entity, strBuff, result)) {
+            TopicDeployEntity newEntity =
                     topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
-            strBuff.append("[updTopicConf], ")
-                    .append(entity.getModifyUser())
-                    .append(" updated record from :")
-                    .append(oldEntity.toString())
-                    .append(" to ").append(curEntity.toString());
+            strBuff.append("[updTopicConf], ").append(entity.getModifyUser())
+                    .append(" updated record from :").append(curEntity)
+                    .append(" to ").append(newEntity);
             logger.info(strBuff.toString());
         } else {
             strBuff.append("[updTopicConf], ")
@@ -440,7 +411,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
-        if (topicDeployMapper.delTopicConf(recordKey, strBuff, result)) {
+        if (topicDeployMapper.delTopicDeployConf(recordKey, strBuff, result)) {
             GroupResCtrlEntity entity =
                     (GroupResCtrlEntity) result.getRetData();
             if (entity != null) {
@@ -573,15 +544,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
+        TopicCtrlEntity curEntity =
+                topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
         if (topicCtrlMapper.updTopicCtrlConf(entity, strBuff, result)) {
-            TopicCtrlEntity oldEntity =
-                    (TopicCtrlEntity) result.getRetData();
-            TopicCtrlEntity curEntity =
+            TopicCtrlEntity newEntity =
                     topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
-            strBuff.append("[updTopicCtrlConf], ")
-                    .append(entity.getModifyUser())
-                    .append(" updated record from :").append(oldEntity.toString())
-                    .append(" to ").append(curEntity.toString());
+            strBuff.append("[updTopicCtrlConf], ").append(entity.getModifyUser())
+                    .append(" updated record from :").append(curEntity)
+                    .append(" to ").append(newEntity);
             logger.info(strBuff.toString());
         } else {
             strBuff.append("[updTopicCtrlConf], ")
@@ -663,16 +633,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
+        GroupResCtrlEntity curEntity =
+                groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
         if (groupResCtrlMapper.updGroupResCtrlConf(entity, strBuff, result)) {
-            GroupResCtrlEntity oldEntity =
-                    (GroupResCtrlEntity) result.getRetData();
-            GroupResCtrlEntity curEntity =
+            GroupResCtrlEntity newEntity =
                     groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
-            strBuff.append("[updGroupResCtrlConf], ")
-                    .append(entity.getModifyUser())
-                    .append(" updated record from :")
-                    .append(oldEntity.toString())
-                    .append(" to ").append(curEntity.toString());
+            strBuff.append("[updGroupResCtrlConf], ").append(entity.getModifyUser())
+                    .append(" updated record from :").append(curEntity)
+                    .append(" to ").append(newEntity);
             logger.info(strBuff.toString());
         } else {
             strBuff.append("[updGroupResCtrlConf], ")
@@ -748,15 +716,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
         if (!checkStoreStatus(true, result)) {
             return result.isSuccess();
         }
+        GroupConsumeCtrlEntity curEntity =
+                consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
         if (consumeCtrlMapper.updGroupConsumeCtrlConf(entity, strBuff, result)) {
-            GroupConsumeCtrlEntity oldEntity =
-                    (GroupConsumeCtrlEntity) result.getRetData();
-            GroupConsumeCtrlEntity curEntity =
+            GroupConsumeCtrlEntity newEntity =
                     consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
-            strBuff.append("[updGroupConsumeCtrlConf], ")
-                    .append(entity.getModifyUser())
-                    .append(" updated record from :").append(oldEntity.toString())
-                    .append(" to ").append(curEntity.toString());
+            strBuff.append("[updGroupConsumeCtrlConf], ").append(entity.getModifyUser())
+                    .append(" updated record from :").append(curEntity)
+                    .append(" to ").append(newEntity);
             logger.info(strBuff.toString());
         } else {
             strBuff.append("[updGroupConsumeCtrlConf], ")
@@ -825,12 +792,12 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
 
     @Override
     public boolean isTopicNameInUsed(String topicName) {
-        return consumeCtrlMapper.isTopicNameInUsed(topicName);
+        return consumeCtrlMapper.isTopicNameInUse(topicName);
     }
 
     @Override
     public boolean hasGroupConsumeCtrlConf(String groupName) {
-        return consumeCtrlMapper.hasGroupConsumeCtrlConf(groupName);
+        return consumeCtrlMapper.isGroupNameInUse(groupName);
     }
 
     @Override