You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/03/25 01:38:33 UTC

[incubator-tubemq] branch TUBEMQ-570 updated: [TUBEMQ-581] Add data cache in BDB metadata Mapper implementations

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
     new 51d8138  [TUBEMQ-581] Add data cache in BDB metadata Mapper implementations
51d8138 is described below

commit 51d81383365359559a0dfde8fe8b869b2ba742b1
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Mar 23 20:37:48 2021 +0800

    [TUBEMQ-581] Add data cache in BDB metadata Mapper implementations
---
 .../server/common/statusdef/TopicStatus.java       |   2 +-
 .../metastore/DataOpErrCode.java}                  |  23 ++-
 .../master/metastore/dao/entity/BaseEntity.java    |  46 ++++-
 .../metastore/dao/entity/BrokerConfEntity.java     |  74 +++++++
 .../metastore/dao/entity/ClusterSettingEntity.java |  32 +++
 .../metastore/dao/entity/GroupBlackListEntity.java |  46 +++++
 .../metastore/dao/entity/GroupConfigEntity.java    |  74 +++++++
 .../dao/entity/GroupFilterCtrlEntity.java          |  54 +++++
 .../metastore/dao/entity/TopicConfEntity.java      |  56 +++++-
 .../metastore/dao/entity/TopicCtrlEntity.java      |  60 ++++++
 .../metastore/dao/entity/TopicPropGroup.java       |  68 ++++++-
 .../metastore/dao/mapper/AbstractMapper.java       |   3 +-
 .../metastore/dao/mapper/BrokerConfigMapper.java   |  10 +-
 .../metastore/dao/mapper/ClusterConfigMapper.java  |   8 +-
 .../metastore/dao/mapper/GroupBlackListMapper.java |  18 +-
 .../metastore/dao/mapper/GroupConfigMapper.java    |  11 +-
 .../dao/mapper/GroupFilterCtrlMapper.java          |  11 +-
 .../metastore/dao/mapper/TopicConfigMapper.java    |  10 +-
 .../metastore/dao/mapper/TopicCtrlMapper.java      |  13 +-
 .../impl/bdbimpl/BdbBrokerConfigMapperImpl.java    | 117 +++++++++--
 .../impl/bdbimpl/BdbClusterConfigMapperImpl.java   |  87 ++++++--
 .../impl/bdbimpl/BdbGroupBlackListMapperImpl.java  | 223 +++++++++++++++++++--
 .../impl/bdbimpl/BdbGroupConfigMapperImpl.java     | 107 ++++++++--
 .../impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java | 183 +++++++++++++++--
 .../impl/bdbimpl/BdbTopicConfigMapperImpl.java     | 164 +++++++++++++--
 .../impl/bdbimpl/BdbTopicCtrlMapperImpl.java       | 106 ++++++++--
 .../AliveObserver.java}                            |  11 +-
 .../KeepAlive.java}                                |  20 +-
 28 files changed, 1488 insertions(+), 149 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
index f54ab7e..13384a4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
@@ -18,7 +18,7 @@
 package org.apache.tubemq.server.common.statusdef;
 
 public enum TopicStatus {
-
+    STATUS_TOPIC_UNDEFINED(-2, "Undefined."),
     STATUS_TOPIC_OK(0, "Normal"),
     STATUS_TOPIC_SOFT_DELETE(1, "Soft deleted"),
     STATUS_TOPIC_SOFT_REMOVE(2, "Soft removed"),
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
similarity index 66%
copy from tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
copy to tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
index f54ab7e..7f1d6c3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
@@ -15,20 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.server.common.statusdef;
+package org.apache.tubemq.server.master.metastore;
 
-public enum TopicStatus {
 
-    STATUS_TOPIC_OK(0, "Normal"),
-    STATUS_TOPIC_SOFT_DELETE(1, "Soft deleted"),
-    STATUS_TOPIC_SOFT_REMOVE(2, "Soft removed"),
-    STATUS_TOPIC_HARD_REMOVE(3, "Hard removed");
+public enum DataOpErrCode {
+    DERR_SUCCESS(200, "Success."),
+    DERR_NOT_EXIST(401, "Record not exist."),
+    DERR_EXISTED(402, "Record has existed."),
+    DERR_UNCHANGED(403, "Record not changed."),
+    DERR_STORE_ABNORMAL(501, "Store layer throw exception."),
+
+    STATUS_DISABLE(0, "Disable.");
 
     private int code;
     private String description;
 
 
-    TopicStatus(int code, String description) {
+    DataOpErrCode(int code, String description) {
         this.code = code;
         this.description = description;
     }
@@ -41,13 +44,13 @@ public enum TopicStatus {
         return description;
     }
 
-    public static TopicStatus valueOf(int code) {
-        for (TopicStatus status : TopicStatus.values()) {
+    public static DataOpErrCode valueOf(int code) {
+        for (DataOpErrCode status : DataOpErrCode.values()) {
             if (status.getCode() == code) {
                 return status;
             }
         }
-        throw new IllegalArgumentException(String.format("unknown topic status code %s", code));
+        throw new IllegalArgumentException(String.format("unknown data operate error code %s", code));
     }
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
index 2ffebe4..9aacb03 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
@@ -20,11 +20,12 @@ package org.apache.tubemq.server.master.metastore.dao.entity;
 import com.google.gson.Gson;
 import java.io.Serializable;
 import java.util.Date;
+import java.util.Objects;
+import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.TServerConstants;
 
 
-
 // AbstractEntity: entity's abstract class
 public class BaseEntity implements Serializable {
 
@@ -131,4 +132,47 @@ public class BaseEntity implements Serializable {
         return gson.toJson(this);
     }
 
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   dataVersionId, createUser, modifyUser
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(BaseEntity target) {
+        if (target == null) {
+            return true;
+        }
+        if ((target.getDataVersionId() != TBaseConstants.META_VALUE_UNDEFINED
+                && this.getDataVersionId() != target.getDataVersionId())
+                || (TStringUtils.isNotBlank(target.getCreateUser())
+                && !target.getCreateUser().equals(createUser))
+                || (TStringUtils.isNotBlank(target.getModifyUser())
+                && !target.getModifyUser().equals(modifyUser))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof BaseEntity)) {
+            return false;
+        }
+        BaseEntity that = (BaseEntity) o;
+        return dataVersionId == that.dataVersionId &&
+                Objects.equals(createUser, that.createUser) &&
+                Objects.equals(createDate, that.createDate) &&
+                Objects.equals(modifyUser, that.modifyUser) &&
+                Objects.equals(modifyDate, that.modifyDate) &&
+                Objects.equals(attributes, that.attributes);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dataVersionId, createUser,
+                createDate, modifyUser, modifyDate, attributes);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
index 3a0feb0..49ae565 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
@@ -18,8 +18,10 @@
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
 import java.util.Date;
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
 
@@ -264,4 +266,76 @@ public class BrokerConfEntity extends BaseEntity  {
                 .append(TokenConstants.ATTR_SEP).append(brokerTLSPort).toString();
     }
 
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   brokerId, brokerIp, brokerPort, brokerTLSPort, regionId, groupId
+     *   manageStatus, brokerWebPort
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(BrokerConfEntity target) {
+        if (target == null) {
+            return true;
+        }
+        if (!super.isMatched(target)) {
+            return false;
+        }
+        if ((target.getBrokerId() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getBrokerId() != this.brokerId)
+                || (TStringUtils.isNotBlank(target.getBrokerIp())
+                && !target.getBrokerIp().equals(this.brokerIp))
+                || (target.getBrokerPort() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getBrokerPort() != this.brokerPort)
+                || (target.getBrokerTLSPort() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getBrokerTLSPort() != this.brokerTLSPort)
+                || (target.getRegionId() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getRegionId() != this.regionId)
+                || (target.getGroupId() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getGroupId() != this.groupId)
+                || (target.getManageStatus() != ManageStatus.STATUS_MANAGE_UNDEFINED
+                && target.getManageStatus() != this.manageStatus)
+                || (target.getBrokerWebPort() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getBrokerWebPort() != this.brokerWebPort)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof BrokerConfEntity)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        BrokerConfEntity entity = (BrokerConfEntity) o;
+        return brokerId == entity.brokerId &&
+                brokerPort == entity.brokerPort &&
+                brokerTLSPort == entity.brokerTLSPort &&
+                brokerWebPort == entity.brokerWebPort &&
+                isConfDataUpdated == entity.isConfDataUpdated &&
+                isBrokerLoaded == entity.isBrokerLoaded &&
+                regionId == entity.regionId &&
+                groupId == entity.groupId &&
+                Objects.equals(brokerIp, entity.brokerIp) &&
+                manageStatus == entity.manageStatus &&
+                Objects.equals(topicProps, entity.topicProps) &&
+                Objects.equals(brokerAddress, entity.brokerAddress) &&
+                Objects.equals(brokerFullInfo, entity.brokerFullInfo) &&
+                Objects.equals(brokerSimpleInfo, entity.brokerSimpleInfo) &&
+                Objects.equals(brokerTLSSimpleInfo, entity.brokerTLSSimpleInfo) &&
+                Objects.equals(brokerTLSFullInfo, entity.brokerTLSFullInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), brokerId, brokerIp, brokerPort, brokerTLSPort,
+                brokerWebPort, manageStatus, isConfDataUpdated, isBrokerLoaded, regionId,
+                groupId, topicProps, brokerAddress, brokerFullInfo, brokerSimpleInfo,
+                brokerTLSSimpleInfo, brokerTLSFullInfo);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
index 3f54faa..a64f350 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
@@ -17,12 +17,14 @@
 
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.server.common.statusdef.EnableStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
 import org.apache.tubemq.server.master.metastore.TStoreConstants;
 
 
+
 /*
  * store the cluster default setting
  *
@@ -179,4 +181,34 @@ public class ClusterSettingEntity extends BaseEntity {
         this.clsDefTopicProps = clsDefTopicProps;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ClusterSettingEntity)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        ClusterSettingEntity entity = (ClusterSettingEntity) o;
+        return brokerPort == entity.brokerPort &&
+                brokerTLSPort == entity.brokerTLSPort &&
+                brokerWebPort == entity.brokerWebPort &&
+                maxMsgSizeInB == entity.maxMsgSizeInB &&
+                qryPriorityId == entity.qryPriorityId &&
+                gloFlowCtrlRuleCnt == entity.gloFlowCtrlRuleCnt &&
+                recordKey.equals(entity.recordKey) &&
+                Objects.equals(clsDefTopicProps, entity.clsDefTopicProps) &&
+                gloFlowCtrlStatus == entity.gloFlowCtrlStatus &&
+                Objects.equals(gloFlowCtrlRuleInfo, entity.gloFlowCtrlRuleInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), recordKey, brokerPort, brokerTLSPort,
+                brokerWebPort, clsDefTopicProps, maxMsgSizeInB, qryPriorityId,
+                gloFlowCtrlStatus, gloFlowCtrlRuleCnt, gloFlowCtrlRuleInfo);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
index b0ebd8c..3b680bf 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
@@ -18,8 +18,10 @@
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
 import java.util.Date;
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
 
 
@@ -87,4 +89,48 @@ public class GroupBlackListEntity extends BaseEntity {
         return reason;
     }
 
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   groupName, topicName
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(GroupBlackListEntity target) {
+        if (target == null) {
+            return true;
+        }
+        if (!super.isMatched(target)) {
+            return false;
+        }
+        if ((TStringUtils.isNotBlank(target.getTopicName())
+                && !target.getTopicName().equals(this.topicName))
+                || (TStringUtils.isNotBlank(target.getGroupName())
+                && !target.getGroupName().equals(this.groupName))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof GroupBlackListEntity)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        GroupBlackListEntity that = (GroupBlackListEntity) o;
+        return Objects.equals(recordKey, that.recordKey) &&
+                Objects.equals(topicName, that.topicName) &&
+                Objects.equals(groupName, that.groupName) &&
+                Objects.equals(reason, that.reason);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), recordKey, topicName, groupName, reason);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
index e24b511..f335ace 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
@@ -18,7 +18,9 @@
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
 import java.util.Date;
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.statusdef.EnableStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
 
@@ -149,4 +151,76 @@ public class GroupConfigEntity extends BaseEntity {
         this.allowedBrokerClientRate = allowedBrokerClientRate;
     }
 
+    public EnableStatus getResCheckStatus() {
+        return resCheckStatus;
+    }
+
+    public void setResCheckStatus(EnableStatus resCheckStatus) {
+        this.resCheckStatus = resCheckStatus;
+    }
+
+    public EnableStatus getFlowCtrlStatus() {
+        return flowCtrlStatus;
+    }
+
+    public void setFlowCtrlStatus(EnableStatus flowCtrlStatus) {
+        this.flowCtrlStatus = flowCtrlStatus;
+    }
+
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   groupName, qryPriorityId, resCheckStatus,
+     *   flowCtrlStatus, allowedBrokerClientRate
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(GroupConfigEntity target) {
+        if (target == null) {
+            return true;
+        }
+        if (!super.isMatched(target)) {
+            return false;
+        }
+        if ((target.getQryPriorityId() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getQryPriorityId() != this.qryPriorityId)
+                || (TStringUtils.isNotBlank(target.getGroupName())
+                && !target.getGroupName().equals(this.groupName))
+                || (target.getResCheckStatus() != EnableStatus.STATUS_UNDEFINE
+                && target.getResCheckStatus() != this.resCheckStatus)
+                || (target.getFlowCtrlStatus() != EnableStatus.STATUS_UNDEFINE
+                && target.getFlowCtrlStatus() != this.flowCtrlStatus)
+                || (target.getAllowedBrokerClientRate() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getAllowedBrokerClientRate() != this.allowedBrokerClientRate)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof GroupConfigEntity)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        GroupConfigEntity that = (GroupConfigEntity) o;
+        return allowedBrokerClientRate == that.allowedBrokerClientRate &&
+                qryPriorityId == that.qryPriorityId &&
+                ruleCnt == that.ruleCnt &&
+                groupName.equals(that.groupName) &&
+                resCheckStatus == that.resCheckStatus &&
+                flowCtrlStatus == that.flowCtrlStatus &&
+                Objects.equals(flowCtrlInfo, that.flowCtrlInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), groupName, resCheckStatus,
+                allowedBrokerClientRate, qryPriorityId, flowCtrlStatus,
+                ruleCnt, flowCtrlInfo);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
index f81c7c4..ab18c8f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
@@ -18,8 +18,10 @@
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
 import java.util.Date;
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.statusdef.EnableStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
 
@@ -125,4 +127,56 @@ public class GroupFilterCtrlEntity extends BaseEntity {
         this.filterCondStr = filterCondStr;
     }
 
+    public EnableStatus getFilterConsumeStatus() {
+        return filterConsumeStatus;
+    }
+
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   topicName, groupName, filterConsumeStatus
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(GroupFilterCtrlEntity target) {
+        if (target == null) {
+            return true;
+        }
+        if (!super.isMatched(target)) {
+            return false;
+        }
+        if ((TStringUtils.isNotBlank(target.getTopicName())
+                && !target.getTopicName().equals(this.topicName))
+                || (TStringUtils.isNotBlank(target.getGroupName())
+                && !target.getGroupName().equals(this.groupName))
+                || (target.getFilterConsumeStatus() != EnableStatus.STATUS_UNDEFINE
+                && target.getFilterConsumeStatus() != this.filterConsumeStatus)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof GroupFilterCtrlEntity)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        GroupFilterCtrlEntity that = (GroupFilterCtrlEntity) o;
+        return recordKey.equals(that.recordKey) &&
+                Objects.equals(topicName, that.topicName) &&
+                Objects.equals(groupName, that.groupName) &&
+                filterConsumeStatus == that.filterConsumeStatus &&
+                Objects.equals(filterCondStr, that.filterCondStr);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), recordKey,
+                topicName, groupName, filterConsumeStatus, filterCondStr);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
index 52cfc61..b090329 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
@@ -18,8 +18,10 @@
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
 import java.util.Date;
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.statusdef.TopicStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
 
@@ -35,7 +37,7 @@ public class TopicConfEntity extends BaseEntity {
     private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
     // topic id, require globally unique
     private int topicId = TBaseConstants.META_VALUE_UNDEFINED;
-    private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_OK;  // topic status
+    private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_UNDEFINED;  // topic status
     private TopicPropGroup topicProps = null;
 
 
@@ -140,4 +142,56 @@ public class TopicConfEntity extends BaseEntity {
     public boolean isValidTopicStatus() {
         return this.deployStatus == TopicStatus.STATUS_TOPIC_OK;
     }
+
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   brokerId, topicId, topicName, topicStatus
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(TopicConfEntity target) {
+        if (target == null) {
+            return true;
+        }
+        if (!super.isMatched(target)) {
+            return false;
+        }
+        if ((target.getBrokerId() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getBrokerId() != this.brokerId)
+                || (target.getTopicId() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getTopicId() != this.topicId)
+                || (TStringUtils.isNotBlank(target.getTopicName())
+                && !target.getTopicName().equals(this.topicName))
+                || (target.getTopicStatus() != TopicStatus.STATUS_TOPIC_UNDEFINED
+                && target.getTopicStatus() != this.deployStatus)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof TopicConfEntity)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        TopicConfEntity that = (TopicConfEntity) o;
+        return brokerId == that.brokerId &&
+                topicId == that.topicId &&
+                Objects.equals(recordKey, that.recordKey) &&
+                Objects.equals(topicName, that.topicName) &&
+                deployStatus == that.deployStatus &&
+                Objects.equals(topicProps, that.topicProps);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), recordKey,
+                topicName, brokerId, topicId, deployStatus, topicProps);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
index 406da37..582672a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
@@ -18,7 +18,9 @@
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
 import java.util.Date;
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.statusdef.EnableStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
 
@@ -92,4 +94,62 @@ public class TopicCtrlEntity extends BaseEntity {
             this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
         }
     }
+
+    public EnableStatus getAuthCtrlStatus() {
+        return authCtrlStatus;
+    }
+
+    public int getMaxMsgSizeInB() {
+        return maxMsgSizeInB;
+    }
+
+    public void setMaxMsgSizeInB(int maxMsgSizeInB) {
+        this.maxMsgSizeInB = maxMsgSizeInB;
+    }
+
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   topicName, maxMsgSizeInB, authCtrlStatus
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(TopicCtrlEntity target) {
+        if (target == null) {
+            return true;
+        }
+        if (!super.isMatched(target)) {
+            return false;
+        }
+        if ((target.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getMaxMsgSizeInB() != this.maxMsgSizeInB)
+                || (TStringUtils.isNotBlank(target.getTopicName())
+                && !target.getTopicName().equals(this.topicName))
+                || (target.getAuthCtrlStatus() != EnableStatus.STATUS_UNDEFINE
+                && target.getAuthCtrlStatus() != this.authCtrlStatus)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof TopicCtrlEntity)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        TopicCtrlEntity that = (TopicCtrlEntity) o;
+        return maxMsgSizeInB == that.maxMsgSizeInB &&
+                topicName.equals(that.topicName) &&
+                authCtrlStatus == that.authCtrlStatus;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), topicName, authCtrlStatus, maxMsgSizeInB);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
index 2567f96..6c2e088 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
@@ -18,8 +18,9 @@
 package org.apache.tubemq.server.master.metastore.dao.entity;
 
 import java.io.Serializable;
+import java.util.Objects;
 import org.apache.tubemq.corebase.TBaseConstants;
-
+import org.apache.tubemq.corebase.utils.TStringUtils;
 
 
 /*
@@ -167,4 +168,69 @@ public class TopicPropGroup implements Serializable {
     public int getDataStoreType() {
         return dataStoreType;
     }
+
+    /**
+     * Check whether the specified query item value matches
+     * Allowed query items:
+     *   numTopicStores, numPartitions, unflushThreshold, unflushInterval, unflushDataHold,
+     *   memCacheMsgSizeInMB, memCacheMsgCntInK, memCacheFlushIntvl, deletePolicy
+     * @return true: matched, false: not match
+     */
+    public boolean isMatched(TopicPropGroup target) {
+        if (target == null) {
+            return true;
+        }
+        if ((target.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getNumTopicStores() != this.numTopicStores)
+                || (target.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getNumPartitions() != this.numPartitions)
+                || (target.getUnflushThreshold() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getUnflushThreshold() != this.unflushThreshold)
+                || (target.getUnflushInterval() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getUnflushInterval() != this.unflushInterval)
+                || (target.getUnflushDataHold() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getUnflushDataHold() != this.unflushDataHold)
+                || (target.getMemCacheMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getMemCacheMsgSizeInMB() != this.memCacheMsgSizeInMB)
+                || (target.getMemCacheMsgCntInK() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getMemCacheMsgCntInK() != this.memCacheMsgCntInK)
+                || (target.getMemCacheFlushIntvl() != TBaseConstants.META_VALUE_UNDEFINED
+                && target.getMemCacheFlushIntvl() != this.memCacheFlushIntvl)
+                || (TStringUtils.isNotBlank(target.getDeletePolicy())
+                && !target.getDeletePolicy().equals(this.deletePolicy))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof TopicPropGroup)) {
+            return false;
+        }
+        TopicPropGroup that = (TopicPropGroup) o;
+        return numTopicStores == that.numTopicStores &&
+                numPartitions == that.numPartitions &&
+                unflushThreshold == that.unflushThreshold &&
+                unflushInterval == that.unflushInterval &&
+                unflushDataHold == that.unflushDataHold &&
+                memCacheMsgSizeInMB == that.memCacheMsgSizeInMB &&
+                memCacheMsgCntInK == that.memCacheMsgCntInK &&
+                memCacheFlushIntvl == that.memCacheFlushIntvl &&
+                acceptPublish == that.acceptPublish &&
+                acceptSubscribe == that.acceptSubscribe &&
+                dataStoreType == that.dataStoreType &&
+                Objects.equals(deletePolicy, that.deletePolicy) &&
+                Objects.equals(dataPath, that.dataPath);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(numTopicStores, numPartitions, unflushThreshold, unflushInterval,
+                unflushDataHold, memCacheMsgSizeInMB, memCacheMsgCntInK, memCacheFlushIntvl,
+                acceptPublish, acceptSubscribe, deletePolicy, dataStoreType, dataPath);
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
index 7deadef..dbed1c9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
@@ -18,14 +18,13 @@
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
 import org.apache.tubemq.server.common.exception.LoadMetaException;
-import org.apache.tubemq.server.common.utils.ProcessResult;
 
 
 public interface AbstractMapper {
 
     void close();
 
-    void loadConfig(ProcessResult result) throws LoadMetaException;
+    void loadConfig() throws LoadMetaException;
 
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
index ec5bd46..7155298 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
@@ -17,15 +17,21 @@
 
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
+import java.util.Map;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
 
 
+
 public interface BrokerConfigMapper extends AbstractMapper {
 
-    boolean putBrokerConfig(BrokerConfEntity memEntity, ProcessResult result);
+    boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
+
+    boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
 
-    boolean delBrokerConfig(int brokerId);
+    boolean delBrokerConf(int brokerId);
 
+    Map<Integer, BrokerConfEntity> getBrokerConfByBrokerId(BrokerConfEntity qryEntity);
 
+    BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
index 6fcb5e4..e9b7827 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
@@ -23,7 +23,11 @@ import org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity
 
 public interface ClusterConfigMapper extends AbstractMapper {
 
-    boolean putClusterConfig(ClusterSettingEntity memEntity, ProcessResult result);
+    boolean addClusterConfig(ClusterSettingEntity memEntity, ProcessResult result);
 
-    boolean delClusterConfig(String key);
+    boolean updClusterConfig(ClusterSettingEntity memEntity, ProcessResult result);
+
+    ClusterSettingEntity getClusterConfig();
+
+    boolean delClusterConfig();
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
index e9f49c2..645f139 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
@@ -17,15 +17,29 @@
 
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
+import java.util.List;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
 
 
+
+
 public interface GroupBlackListMapper extends AbstractMapper {
 
-    boolean putGroupBlackListConfig(GroupBlackListEntity memEntity, ProcessResult result);
+    // about group blacklist api
+    boolean addGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
+
+    boolean updGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
+
+    boolean delGroupBlackListConf(String recordKey);
+
+    boolean delGroupBlackListConfByGroupName(String groupName);
+
+    List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName);
+
+    List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName);
 
-    boolean delGroupBlackListConfig(String key);
+    List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity);
 
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
index e01775c..123c39f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
+import java.util.Map;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
 
@@ -24,9 +25,15 @@ import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
 
 public interface GroupConfigMapper extends AbstractMapper {
 
-    boolean putGroupConfigConfig(GroupConfigEntity memEntity, ProcessResult result);
+    boolean addGroupConf(GroupConfigEntity entity, ProcessResult result);
 
-    boolean delGroupConfigConfig(String key);
+    boolean updGroupConf(GroupConfigEntity entity, ProcessResult result);
+
+    boolean delGroupConf(String groupName);
+
+    GroupConfigEntity getGroupConf(String groupName);
+
+    Map<String, GroupConfigEntity> getGroupConf(GroupConfigEntity qryEntity);
 
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
index 1925b41..a155f7a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
@@ -17,15 +17,22 @@
 
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
+import java.util.List;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
 
 
+
 public interface GroupFilterCtrlMapper extends AbstractMapper {
 
-    boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity, ProcessResult result);
+    boolean addGroupFilterCtrlConf(GroupFilterCtrlEntity entity, ProcessResult result);
+
+    boolean updGroupFilterCtrlConf(GroupFilterCtrlEntity entity, ProcessResult result);
+
+    boolean delGroupFilterCtrlConf(String recordKey);
 
-    boolean delGroupFilterCtrlConfig(String key);
+    List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(String groupName);
 
+    List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(GroupFilterCtrlEntity qryEntity);
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
index fda4f6c..25a127c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
@@ -17,15 +17,21 @@
 
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
+import java.util.List;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
 
 
+
 public interface TopicConfigMapper extends AbstractMapper {
 
-    boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult result);
+    boolean addTopicConf(TopicConfEntity entity, ProcessResult result);
+
+    boolean updTopicConf(TopicConfEntity entity, ProcessResult result);
+
+    boolean delTopicConf(String recordKey);
 
-    boolean delTopicConfig(String key);
+    List<TopicConfEntity> getTopicConf(TopicConfEntity qryEntity);
 
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
index c42ea45..5b66618 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
@@ -17,16 +17,25 @@
 
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
+import java.util.List;
 import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
 import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
 
 
 
+
 public interface TopicCtrlMapper extends AbstractMapper {
 
-    boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult result);
+    boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+
+    boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+
+    boolean delTopicCtrlConf(String recordKey);
+
+    TopicCtrlEntity getTopicCtrlConf(String topicName);
 
-    boolean delTopicCtrlConfig(String key);
+    List<TopicCtrlEntity> getTopicCtrlConf(TopicConfEntity qryEntity);
 
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 8d1c693..5c11f4e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -25,10 +25,12 @@ import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
 import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
 import org.apache.tubemq.server.master.metastore.dao.mapper.BrokerConfigMapper;
 import org.slf4j.Logger;
@@ -41,11 +43,11 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
     private static final Logger logger =
             LoggerFactory.getLogger(BdbBrokerConfigMapperImpl.class);
 
-
     // broker config store
     private EntityStore brokerConfStore;
     private PrimaryIndex<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfIndex;
-
+    private ConcurrentHashMap<Integer/* brokerId */, BrokerConfEntity> metaDataCache =
+            new ConcurrentHashMap<>();
 
     public BdbBrokerConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
         brokerConfStore = new EntityStore(repEnv,
@@ -56,6 +58,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
 
     @Override
     public void close() {
+        metaDataCache.clear();
         if (brokerConfStore != null) {
             try {
                 brokerConfStore.close();
@@ -67,12 +70,12 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
     }
 
     @Override
-    public void loadConfig(ProcessResult result) throws LoadMetaException {
+    public void loadConfig() throws LoadMetaException {
         long count = 0L;
-        Map<Integer, BrokerConfEntity> metaDataMap = new HashMap<>();
         EntityCursor<BdbBrokerConfEntity> cursor = null;
         logger.info("[BDB Impl] load broker configure start...");
         try {
+            metaDataCache.clear();
             cursor = brokerConfIndex.entities();
             for (BdbBrokerConfEntity bdbEntity : cursor) {
                 if (bdbEntity == null) {
@@ -81,11 +84,10 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
                 }
                 BrokerConfEntity memEntity =
                         new BrokerConfEntity(bdbEntity);
-                metaDataMap.put(memEntity.getBrokerId(), memEntity);
+                metaDataCache.put(memEntity.getBrokerId(), memEntity);
                 count++;
             }
             logger.info("[BDB Impl] total broker configure records are {}", count);
-            result.setSuccResult(metaDataMap);
         } catch (Exception e) {
             logger.error("[BDB Impl] load broker configure failure ", e);
             throw new LoadMetaException(e.getMessage());
@@ -97,6 +99,96 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
         logger.info("[BDB Impl] load broker configure successfully...");
     }
 
+    @Override
+    public boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult result) {
+        BrokerConfEntity curEntity =
+                metaDataCache.get(memEntity.getBrokerId());
+        if (curEntity != null) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The broker ").append(memEntity.getBrokerIp())
+                            .append("'s configure already exists, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putBrokerConfig2Bdb(memEntity, result)) {
+            metaDataCache.put(memEntity.getBrokerId(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult result) {
+        BrokerConfEntity curEntity =
+                metaDataCache.get(memEntity.getBrokerId());
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The broker ").append(memEntity.getBrokerIp())
+                            .append("'s configure is not exists, please add record first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (curEntity.equals(memEntity)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The broker ").append(memEntity.getBrokerIp())
+                            .append("'s configure have not changed, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putBrokerConfig2Bdb(memEntity, result)) {
+            metaDataCache.put(memEntity.getBrokerId(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    /**
+     * delete broker configure info from bdb store
+     * @return
+     */
+    @Override
+    public boolean delBrokerConf(int brokerId) {
+        BrokerConfEntity curEntity =
+                metaDataCache.get(brokerId);
+        if (curEntity == null) {
+            return true;
+        }
+        delBrokerConfigFromBdb(brokerId);
+        metaDataCache.remove(brokerId);
+        return true;
+    }
+
+    /**
+     * get broker configure info from bdb store
+     * @return result, only read
+     */
+    @Override
+    public Map<Integer, BrokerConfEntity> getBrokerConfByBrokerId(BrokerConfEntity qryEntity) {
+        Map<Integer, BrokerConfEntity> retMap = new HashMap<>();
+        if (qryEntity == null) {
+            for (BrokerConfEntity entity : metaDataCache.values()) {
+                retMap.put(entity.getBrokerId(), entity);
+            }
+        } else {
+            for (BrokerConfEntity entity : metaDataCache.values()) {
+                if (entity.isMatched(qryEntity)) {
+                    retMap.put(entity.getBrokerId(), entity);
+                }
+            }
+        }
+        return retMap;
+    }
+
+    /**
+     * get broker configure info from bdb store
+     * @return result, only read
+     */
+    @Override
+    public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
+        return metaDataCache.get(brokerId);
+    }
+
     /**
      * Put cluster setting info into bdb store
      *
@@ -104,8 +196,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
      * @param result process result with old value
      * @return
      */
-    @Override
-    public boolean putBrokerConfig(BrokerConfEntity memEntity, ProcessResult result) {
+    private boolean putBrokerConfig2Bdb(BrokerConfEntity memEntity, ProcessResult result) {
         BdbBrokerConfEntity retData = null;
         BdbBrokerConfEntity bdbEntity =
                 memEntity.buildBdbBrokerConfEntity();
@@ -113,17 +204,17 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
             retData = brokerConfIndex.put(bdbEntity);
         } catch (Throwable e) {
             logger.error("[BDB Impl] put broker configure failure ", e);
-            result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
-                    .append("Put broker configure failure: ")
-                    .append(e.getMessage()).toString());
+            result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("Put broker configure failure: ")
+                            .append(e.getMessage()).toString());
             return result.isSuccess();
         }
         result.setSuccResult(retData == null);
         return result.isSuccess();
     }
 
-    @Override
-    public boolean delBrokerConfig(int brokerId) {
+    private boolean delBrokerConfigFromBdb(int brokerId) {
         try {
             brokerConfIndex.delete(brokerId);
         } catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
index b9fb67e..07413fd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
@@ -22,12 +22,14 @@ import com.sleepycat.persist.EntityCursor;
 import com.sleepycat.persist.EntityStore;
 import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
 import org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
 import org.apache.tubemq.server.master.metastore.dao.mapper.ClusterConfigMapper;
 import org.slf4j.Logger;
@@ -41,7 +43,7 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
 
     private EntityStore clsDefSettingStore;
     private PrimaryIndex<String, BdbClusterSettingEntity> clsDefSettingIndex;
-
+    Map<String, ClusterSettingEntity> metaDataCache = new ConcurrentHashMap<>();
 
     public BdbClusterConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
         clsDefSettingStore = new EntityStore(repEnv,
@@ -52,6 +54,7 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
 
     @Override
     public void close() {
+        metaDataCache.clear();
         if (clsDefSettingStore != null) {
             try {
                 clsDefSettingStore.close();
@@ -63,12 +66,12 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
     }
 
     @Override
-    public void loadConfig(ProcessResult result) throws LoadMetaException {
+    public void loadConfig() throws LoadMetaException {
         long count = 0L;
-        Map<String, ClusterSettingEntity> metaDataMap = new HashMap<>();
         EntityCursor<BdbClusterSettingEntity> cursor = null;
         logger.info("[BDB Impl] load cluster configure start...");
         try {
+            metaDataCache.clear();
             cursor = clsDefSettingIndex.entities();
             for (BdbClusterSettingEntity bdbEntity : cursor) {
                 if (bdbEntity == null) {
@@ -77,11 +80,10 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
                 }
                 ClusterSettingEntity memEntity =
                         new ClusterSettingEntity(bdbEntity);
-                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                metaDataCache.put(memEntity.getRecordKey(), memEntity);
                 count++;
             }
             logger.info("[BDB Impl] total cluster configure records are {}", count);
-            result.setSuccResult(metaDataMap);
         } catch (Exception e) {
             logger.error("[BDB Impl] load cluster configure failure ", e);
             throw new LoadMetaException(e.getMessage());
@@ -101,7 +103,68 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
      * @return
      */
     @Override
-    public boolean putClusterConfig(ClusterSettingEntity memEntity, ProcessResult result) {
+    public boolean addClusterConfig(ClusterSettingEntity memEntity, ProcessResult result) {
+        if (!metaDataCache.isEmpty()) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    "The cluster setting already exists, please delete or update!");
+            return result.isSuccess();
+        }
+        if (putClusterConfig2Bdb(memEntity, result)) {
+            metaDataCache.put(memEntity.getRecordKey(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    /**
+     * Update cluster setting info in bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean updClusterConfig(ClusterSettingEntity memEntity, ProcessResult result) {
+        if (metaDataCache.isEmpty()) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    "The cluster setting is null, please add record first!");
+            return result.isSuccess();
+        }
+        ClusterSettingEntity curEntity = metaDataCache.get(memEntity.getRecordKey());
+        if (curEntity.equals(memEntity)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    "The cluster settings have not changed!");
+            return result.isSuccess();
+        }
+        if (putClusterConfig2Bdb(memEntity, result)) {
+            metaDataCache.put(memEntity.getRecordKey(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    /**
+     * get current cluster setting from bdb store
+     * @return current cluster setting, null or object, only read
+     */
+    @Override
+    public ClusterSettingEntity getClusterConfig() {
+        return metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+    }
+
+    /**
+     * delete current cluster setting from bdb store
+     * @return
+     */
+    @Override
+    public boolean delClusterConfig() {
+        if (metaDataCache.isEmpty()) {
+            return true;
+        }
+        delClusterConfigFromBdb(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+        metaDataCache.clear();
+        return true;
+    }
+
+    private boolean putClusterConfig2Bdb(ClusterSettingEntity memEntity, ProcessResult result) {
         BdbClusterSettingEntity retData = null;
         BdbClusterSettingEntity bdbEntity =
                 memEntity.buildBdbClsDefSettingEntity();
@@ -109,17 +172,17 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
             retData = clsDefSettingIndex.put(bdbEntity);
         } catch (Throwable e) {
             logger.error("[BDB Impl] put cluster configure failure ", e);
-            result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
-                    .append("Put cluster configure failure: ")
-                    .append(e.getMessage()).toString());
+            result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("Put cluster configure failure: ")
+                            .append(e.getMessage()).toString());
             return result.isSuccess();
         }
         result.setSuccResult(retData == null);
         return result.isSuccess();
     }
 
-    @Override
-    public boolean delClusterConfig(String key) {
+    private boolean delClusterConfigFromBdb(String key) {
         try {
             clsDefSettingIndex.delete(key);
         } catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
index 7dcb9b0..f4c9586 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
@@ -23,12 +23,16 @@ import com.sleepycat.persist.EntityCursor;
 import com.sleepycat.persist.EntityStore;
 import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
 import org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
 import org.apache.tubemq.server.master.metastore.dao.mapper.GroupBlackListMapper;
 import org.slf4j.Logger;
@@ -38,14 +42,18 @@ import org.slf4j.LoggerFactory;
 
 public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
 
-
     private static final Logger logger =
             LoggerFactory.getLogger(BdbGroupBlackListMapperImpl.class);
-
-
     // consumer group black list store
     private EntityStore blackGroupStore;
     private PrimaryIndex<String/* recordKey */, BdbBlackGroupEntity> blackGroupIndex;
+    private ConcurrentHashMap<String/* recordKey */, GroupBlackListEntity>
+            groupBlackListCache = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
+            groupBlackListTopicCache = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String/* groupName */, ConcurrentHashSet<String> >
+            groupBlackListGroupCache = new ConcurrentHashMap<>();
+
 
 
     public BdbGroupBlackListMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
@@ -57,6 +65,9 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
 
     @Override
     public void close() {
+        // clear cache data
+        clearCacheData();
+        // close store object
         if (blackGroupStore != null) {
             try {
                 blackGroupStore.close();
@@ -68,12 +79,14 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
     }
 
     @Override
-    public void loadConfig(ProcessResult result) throws LoadMetaException {
+    public void loadConfig() throws LoadMetaException {
         long count = 0L;
-        Map<String, GroupBlackListEntity> metaDataMap = new HashMap<>();
         EntityCursor<BdbBlackGroupEntity> cursor = null;
         logger.info("[BDB Impl] load blacklist configure start...");
         try {
+            // clear cache data
+            clearCacheData();
+            // read data from bdb
             cursor = blackGroupIndex.entities();
             for (BdbBlackGroupEntity bdbEntity : cursor) {
                 if (bdbEntity == null) {
@@ -82,11 +95,10 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
                 }
                 GroupBlackListEntity memEntity =
                         new GroupBlackListEntity(bdbEntity);
-                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                addOrUpdCacheRecord(memEntity);
                 count++;
             }
             logger.info("[BDB Impl] total blacklist configure records are {}", count);
-            result.setSuccResult(metaDataMap);
         } catch (Exception e) {
             logger.error("[BDB Impl] load blacklist configure failure ", e);
             throw new LoadMetaException(e.getMessage());
@@ -98,6 +110,126 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
         logger.info("[BDB Impl] load blacklist configure successfully...");
     }
 
+    @Override
+    public boolean addGroupBlackListConf(GroupBlackListEntity memEntity, ProcessResult result) {
+        GroupBlackListEntity curEntity =
+                groupBlackListCache.get(memEntity.getRecordKey());
+        if (curEntity != null) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The blacklist ").append(memEntity.getRecordKey())
+                            .append("'s configure already exists, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putGroupBlackListConfig2Bdb(memEntity, result)) {
+            addOrUpdCacheRecord(memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean updGroupBlackListConf(GroupBlackListEntity memEntity, ProcessResult result) {
+        GroupBlackListEntity curEntity =
+                groupBlackListCache.get(memEntity.getRecordKey());
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The blacklist ").append(memEntity.getRecordKey())
+                            .append("'s configure is not exists, please add record first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (curEntity.equals(memEntity)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The blacklist ").append(memEntity.getRecordKey())
+                            .append("'s configure have not changed, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putGroupBlackListConfig2Bdb(memEntity, result)) {
+            addOrUpdCacheRecord(memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delGroupBlackListConf(String recordKey) {
+        GroupBlackListEntity curEntity =
+                groupBlackListCache.get(recordKey);
+        if (curEntity == null) {
+            return true;
+        }
+        delGroupBlackListConfigFromBdb(recordKey);
+        delCacheRecord(recordKey);
+        return true;
+    }
+
+    @Override
+    public List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName) {
+        ConcurrentHashSet<String> keySet =
+                groupBlackListGroupCache.get(groupName);
+        if (keySet == null || keySet.isEmpty()) {
+            return Collections.emptyList();
+        }
+        GroupBlackListEntity entity;
+        List<GroupBlackListEntity> result = new ArrayList<>();
+        for (String recordKey : keySet) {
+            entity = groupBlackListCache.get(recordKey);
+            if (entity != null) {
+                result.add(entity);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public boolean delGroupBlackListConfByGroupName(String groupName) {
+        List<GroupBlackListEntity> curEntitys =
+                getGrpBlkLstConfByTopicName(groupName);
+        if (curEntitys.isEmpty()) {
+            return true;
+        }
+        for (GroupBlackListEntity entity : curEntitys) {
+            delGroupBlackListConf(entity.getRecordKey());
+        }
+        return true;
+    }
+
+    @Override
+    public List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName) {
+        ConcurrentHashSet<String> keySet =
+                groupBlackListTopicCache.get(topicName);
+        if (keySet == null || keySet.isEmpty()) {
+            return Collections.emptyList();
+        }
+        GroupBlackListEntity entity;
+        List<GroupBlackListEntity> result = new ArrayList<>();
+        for (String recordKey : keySet) {
+            entity = groupBlackListCache.get(recordKey);
+            if (entity != null) {
+                result.add(entity);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity) {
+        List<GroupBlackListEntity> retEntitys = new ArrayList<>();
+        if (qryEntity == null) {
+            retEntitys.addAll(groupBlackListCache.values());
+        } else {
+            for (GroupBlackListEntity entity : groupBlackListCache.values()) {
+                if (entity.isMatched(qryEntity)) {
+                    retEntitys.add(entity);
+                }
+            }
+        }
+        return retEntitys;
+    }
+
     /**
      * Put blacklist configure info into bdb store
      *
@@ -105,26 +237,25 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
      * @param result process result with old value
      * @return
      */
-    @Override
-    public boolean putGroupBlackListConfig(GroupBlackListEntity memEntity, ProcessResult result) {
+    private boolean putGroupBlackListConfig2Bdb(GroupBlackListEntity memEntity,
+                                                ProcessResult result) {
         BdbBlackGroupEntity retData = null;
-        BdbBlackGroupEntity bdbEntity =
-                memEntity.buildBdbBlackListEntity();
+        BdbBlackGroupEntity bdbEntity = memEntity.buildBdbBlackListEntity();
         try {
             retData = blackGroupIndex.put(bdbEntity);
         } catch (Throwable e) {
             logger.error("[BDB Impl] put blacklist configure failure ", e);
-            result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
-                    .append("Put blacklist configure failure: ")
-                    .append(e.getMessage()).toString());
+            result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("Put blacklist configure failure: ")
+                            .append(e.getMessage()).toString());
             return result.isSuccess();
         }
         result.setSuccResult(retData == null);
         return result.isSuccess();
     }
 
-    @Override
-    public boolean delGroupBlackListConfig(String recordKey) {
+    private boolean delGroupBlackListConfigFromBdb(String recordKey) {
         try {
             blackGroupIndex.delete(recordKey);
         } catch (Throwable e) {
@@ -134,4 +265,60 @@ public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
         return true;
     }
 
+
+    private void delCacheRecord(String recordKey) {
+        GroupBlackListEntity curEntity = groupBlackListCache.get(recordKey);
+        if (curEntity == null) {
+            return;
+        }
+        // add topic index
+        ConcurrentHashSet<String> keySet =
+                groupBlackListTopicCache.get(curEntity.getTopicName());
+        if (keySet != null) {
+            keySet.remove(recordKey);
+            if (keySet.isEmpty()) {
+                groupBlackListTopicCache.remove(curEntity.getTopicName());
+            }
+        }
+        // delete group index
+        keySet = groupBlackListGroupCache.remove(curEntity.getGroupName());
+        if (keySet != null) {
+            keySet.remove(recordKey);
+            if (keySet.isEmpty()) {
+                groupBlackListGroupCache.remove(curEntity.getGroupName());
+            }
+        }
+    }
+
+    private void addOrUpdCacheRecord(GroupBlackListEntity entity) {
+        groupBlackListCache.put(entity.getRecordKey(), entity);
+        // add topic index map
+        ConcurrentHashSet<String> keySet =
+                groupBlackListTopicCache.get(entity.getTopicName());
+        if (keySet == null) {
+            ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+            keySet = groupBlackListTopicCache.putIfAbsent(entity.getTopicName(), tmpSet);
+            if (keySet == null) {
+                keySet = tmpSet;
+            }
+        }
+        keySet.add(entity.getRecordKey());
+        // add group index map
+        keySet = groupBlackListGroupCache.get(entity.getGroupName());
+        if (keySet == null) {
+            ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+            keySet = groupBlackListGroupCache.putIfAbsent(entity.getGroupName(), tmpSet);
+            if (keySet == null) {
+                keySet = tmpSet;
+            }
+        }
+        keySet.add(entity.getRecordKey());
+    }
+
+    private void clearCacheData() {
+        groupBlackListTopicCache.clear();
+        groupBlackListGroupCache.clear();
+        groupBlackListCache.clear();
+    }
+
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
index bb76154..b483822 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
@@ -25,10 +25,12 @@ import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
 import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
 import org.apache.tubemq.server.master.metastore.dao.mapper.GroupConfigMapper;
 import org.slf4j.Logger;
@@ -38,15 +40,13 @@ import org.slf4j.LoggerFactory;
 
 public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
 
-
     private static final Logger logger =
             LoggerFactory.getLogger(BdbGroupConfigMapperImpl.class);
-
-
     // consumer group configure store
     private EntityStore groupConfStore;
     private PrimaryIndex<String/* groupName */, BdbGroupFlowCtrlEntity> groupConfIndex;
-
+    private ConcurrentHashMap<String/* groupName */, GroupConfigEntity> groupConfCache =
+            new ConcurrentHashMap<>();
 
     public BdbGroupConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
         groupConfStore = new EntityStore(repEnv,
@@ -57,6 +57,7 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
 
     @Override
     public void close() {
+        groupConfCache.clear();
         if (groupConfStore != null) {
             try {
                 groupConfStore.close();
@@ -68,12 +69,12 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
     }
 
     @Override
-    public void loadConfig(ProcessResult result) throws LoadMetaException {
+    public void loadConfig() throws LoadMetaException {
         long count = 0L;
-        Map<String, GroupConfigEntity> metaDataMap = new HashMap<>();
         EntityCursor<BdbGroupFlowCtrlEntity> cursor = null;
         logger.info("[BDB Impl] load group configure start...");
         try {
+            groupConfCache.clear();
             cursor = groupConfIndex.entities();
             for (BdbGroupFlowCtrlEntity bdbEntity : cursor) {
                 if (bdbEntity == null) {
@@ -82,11 +83,10 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
                 }
                 GroupConfigEntity memEntity =
                         new GroupConfigEntity(bdbEntity);
-                metaDataMap.put(memEntity.getGroupName(), memEntity);
+                groupConfCache.put(memEntity.getGroupName(), memEntity);
                 count++;
             }
             logger.info("[BDB Impl] total group configure records are {}", count);
-            result.setSuccResult(metaDataMap);
         } catch (Exception e) {
             logger.error("[BDB Impl] load group configure failure ", e);
             throw new LoadMetaException(e.getMessage());
@@ -98,6 +98,84 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
         logger.info("[BDB Impl] load group configure successfully...");
     }
 
+    @Override
+    public boolean addGroupConf(GroupConfigEntity memEntity, ProcessResult result) {
+        GroupConfigEntity curEntity =
+                groupConfCache.get(memEntity.getGroupName());
+        if (curEntity != null) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The group ").append(memEntity.getGroupName())
+                            .append("'s configure already exists, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putGroupConfigConfig2Bdb(memEntity, result)) {
+            groupConfCache.put(memEntity.getGroupName(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean updGroupConf(GroupConfigEntity memEntity, ProcessResult result) {
+        GroupConfigEntity curEntity =
+                groupConfCache.get(memEntity.getGroupName());
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The group ").append(memEntity.getGroupName())
+                            .append("'s configure is not exists, please add record first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (curEntity.equals(memEntity)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The group ").append(memEntity.getGroupName())
+                            .append("'s configure have not changed, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putGroupConfigConfig2Bdb(memEntity, result)) {
+            groupConfCache.put(memEntity.getGroupName(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delGroupConf(String groupName) {
+        GroupConfigEntity curEntity =
+                groupConfCache.get(groupName);
+        if (curEntity == null) {
+            return true;
+        }
+        delGroupConfigConfigFromBdb(groupName);
+        groupConfCache.remove(groupName);
+        return true;
+    }
+
+    @Override
+    public GroupConfigEntity getGroupConf(String groupName) {
+        return groupConfCache.get(groupName);
+    }
+
+    @Override
+    public Map<String, GroupConfigEntity> getGroupConf(GroupConfigEntity qryEntity) {
+        Map<String, GroupConfigEntity> retMap = new HashMap<>();
+        if (qryEntity == null) {
+            for (GroupConfigEntity entity : groupConfCache.values()) {
+                retMap.put(entity.getGroupName(), entity);
+            }
+        } else {
+            for (GroupConfigEntity entity : groupConfCache.values()) {
+                if (entity.isMatched(qryEntity)) {
+                    retMap.put(entity.getGroupName(), entity);
+                }
+            }
+        }
+        return retMap;
+    }
+
     /**
      * Put Group configure info into bdb store
      *
@@ -105,8 +183,7 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
      * @param result process result with old value
      * @return
      */
-    @Override
-    public boolean putGroupConfigConfig(GroupConfigEntity memEntity, ProcessResult result) {
+    private boolean putGroupConfigConfig2Bdb(GroupConfigEntity memEntity, ProcessResult result) {
         BdbGroupFlowCtrlEntity retData = null;
         BdbGroupFlowCtrlEntity bdbEntity =
                 memEntity.buildBdbGroupFlowCtrlEntity();
@@ -114,17 +191,17 @@ public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
             retData = groupConfIndex.put(bdbEntity);
         } catch (Throwable e) {
             logger.error("[BDB Impl] put group configure failure ", e);
-            result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
-                    .append("Put group configure failure: ")
-                    .append(e.getMessage()).toString());
+            result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("Put group configure failure: ")
+                            .append(e.getMessage()).toString());
             return result.isSuccess();
         }
         result.setSuccResult(retData == null);
         return result.isSuccess();
     }
 
-    @Override
-    public boolean delGroupConfigConfig(String recordKey) {
+    private boolean delGroupConfigConfigFromBdb(String recordKey) {
         try {
             groupConfIndex.delete(recordKey);
         } catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
index 5280053..80ab930 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
@@ -23,12 +23,16 @@ import com.sleepycat.persist.EntityCursor;
 import com.sleepycat.persist.EntityStore;
 import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
 import org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
 import org.apache.tubemq.server.master.metastore.dao.mapper.GroupFilterCtrlMapper;
 import org.slf4j.Logger;
@@ -38,14 +42,19 @@ import org.slf4j.LoggerFactory;
 
 public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
 
-
     private static final Logger logger =
             LoggerFactory.getLogger(BdbGroupFilterCtrlMapperImpl.class);
-
-
     // consumer group filter control store
     private EntityStore groupFilterStore;
     private PrimaryIndex<String/* recordKey */, BdbGroupFilterCondEntity> groupFilterIndex;
+    // configure cache
+    private ConcurrentHashMap<String/* recordKey */, GroupFilterCtrlEntity>
+            groupFilterCtrlCache = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
+            groupFilterCtrlTopicCache = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String/* groupName */, ConcurrentHashSet<String> >
+            groupFilterCtrlGroupCache = new ConcurrentHashMap<>();
+
 
 
     public BdbGroupFilterCtrlMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
@@ -57,6 +66,7 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
 
     @Override
     public void close() {
+        clearCacheData();
         if (groupFilterStore != null) {
             try {
                 groupFilterStore.close();
@@ -68,12 +78,12 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
     }
 
     @Override
-    public void loadConfig(ProcessResult result) throws LoadMetaException {
+    public void loadConfig() throws LoadMetaException {
         long count = 0L;
-        Map<String, GroupFilterCtrlEntity> metaDataMap = new HashMap<>();
         EntityCursor<BdbGroupFilterCondEntity> cursor = null;
         logger.info("[BDB Impl] load filter configure start...");
         try {
+            clearCacheData();
             cursor = groupFilterIndex.entities();
             for (BdbGroupFilterCondEntity bdbEntity : cursor) {
                 if (bdbEntity == null) {
@@ -82,11 +92,10 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
                 }
                 GroupFilterCtrlEntity memEntity =
                         new GroupFilterCtrlEntity(bdbEntity);
-                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                addOrUpdCacheRecord(memEntity);
                 count++;
             }
             logger.info("[BDB Impl] total filter configure records are {}", count);
-            result.setSuccResult(metaDataMap);
         } catch (Exception e) {
             logger.error("[BDB Impl] load filter configure failure ", e);
             throw new LoadMetaException(e.getMessage());
@@ -98,6 +107,95 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
         logger.info("[BDB Impl] load filter configure successfully...");
     }
 
+    @Override
+    public boolean addGroupFilterCtrlConf(GroupFilterCtrlEntity memEntity, ProcessResult result) {
+        GroupFilterCtrlEntity curEntity =
+                groupFilterCtrlCache.get(memEntity.getRecordKey());
+        if (curEntity != null) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The group filter ").append(memEntity.getRecordKey())
+                            .append("'s configure already exists, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putGroupFilterCtrlConfig2Bdb(memEntity, result)) {
+            addOrUpdCacheRecord(memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean updGroupFilterCtrlConf(GroupFilterCtrlEntity memEntity, ProcessResult result) {
+        GroupFilterCtrlEntity curEntity =
+                groupFilterCtrlCache.get(memEntity.getRecordKey());
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The group filter ").append(memEntity.getRecordKey())
+                            .append("'s configure is not exists, please add record first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (curEntity.equals(memEntity)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The group filter ").append(memEntity.getRecordKey())
+                            .append("'s configure have not changed, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putGroupFilterCtrlConfig2Bdb(memEntity, result)) {
+            addOrUpdCacheRecord(memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delGroupFilterCtrlConf(String recordKey) {
+        GroupFilterCtrlEntity curEntity =
+                groupFilterCtrlCache.get(recordKey);
+        if (curEntity == null) {
+            return true;
+        }
+        delGroupFilterCtrlConfigFromBdb(recordKey);
+        delCacheRecord(recordKey);
+        return true;
+    }
+
+    @Override
+    public List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(String groupName) {
+        ConcurrentHashSet<String> keySet =
+                groupFilterCtrlGroupCache.get(groupName);
+        if (keySet == null || keySet.isEmpty()) {
+            return Collections.emptyList();
+        }
+        GroupFilterCtrlEntity entity;
+        List<GroupFilterCtrlEntity> result = new ArrayList<>();
+        for (String recordKey : keySet) {
+            entity = groupFilterCtrlCache.get(recordKey);
+            if (entity != null) {
+                result.add(entity);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(GroupFilterCtrlEntity qryEntity) {
+        List<GroupFilterCtrlEntity> retEntitys = new ArrayList<>();
+        if (qryEntity == null) {
+            retEntitys.addAll(groupFilterCtrlCache.values());
+        } else {
+            for (GroupFilterCtrlEntity entity : groupFilterCtrlCache.values()) {
+                if (entity.isMatched(qryEntity)) {
+                    retEntitys.add(entity);
+                }
+            }
+        }
+        return retEntitys;
+    }
+
     /**
      * Put Group filter configure info into bdb store
      *
@@ -105,8 +203,7 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
      * @param result process result with old value
      * @return
      */
-    @Override
-    public boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity, ProcessResult result) {
+    private boolean putGroupFilterCtrlConfig2Bdb(GroupFilterCtrlEntity memEntity, ProcessResult result) {
         BdbGroupFilterCondEntity retData = null;
         BdbGroupFilterCondEntity bdbEntity =
                 memEntity.buildBdbGroupFilterCondEntity();
@@ -114,17 +211,17 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
             retData = groupFilterIndex.put(bdbEntity);
         } catch (Throwable e) {
             logger.error("[BDB Impl] put filter configure failure ", e);
-            result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
-                    .append("Put filter configure failure: ")
-                    .append(e.getMessage()).toString());
+            result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("Put filter configure failure: ")
+                            .append(e.getMessage()).toString());
             return result.isSuccess();
         }
         result.setSuccResult(retData == null);
         return result.isSuccess();
     }
 
-    @Override
-    public boolean delGroupFilterCtrlConfig(String recordKey) {
+    private boolean delGroupFilterCtrlConfigFromBdb(String recordKey) {
         try {
             groupFilterIndex.delete(recordKey);
         } catch (Throwable e) {
@@ -134,4 +231,58 @@ public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
         return true;
     }
 
+    private void delCacheRecord(String recordKey) {
+        GroupFilterCtrlEntity curEntity = groupFilterCtrlCache.get(recordKey);
+        if (curEntity == null) {
+            return;
+        }
+        // add topic index
+        ConcurrentHashSet<String> keySet =
+                groupFilterCtrlTopicCache.get(curEntity.getTopicName());
+        if (keySet != null) {
+            keySet.remove(recordKey);
+            if (keySet.isEmpty()) {
+                groupFilterCtrlTopicCache.remove(curEntity.getTopicName());
+            }
+        }
+        // delete group index
+        keySet = groupFilterCtrlGroupCache.remove(curEntity.getGroupName());
+        if (keySet != null) {
+            keySet.remove(recordKey);
+            if (keySet.isEmpty()) {
+                groupFilterCtrlGroupCache.remove(curEntity.getGroupName());
+            }
+        }
+    }
+
+    private void addOrUpdCacheRecord(GroupFilterCtrlEntity entity) {
+        groupFilterCtrlCache.put(entity.getRecordKey(), entity);
+        // add topic index map
+        ConcurrentHashSet<String> keySet =
+                groupFilterCtrlTopicCache.get(entity.getTopicName());
+        if (keySet == null) {
+            ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+            keySet = groupFilterCtrlTopicCache.putIfAbsent(entity.getTopicName(), tmpSet);
+            if (keySet == null) {
+                keySet = tmpSet;
+            }
+        }
+        keySet.add(entity.getRecordKey());
+        // add group index map
+        keySet = groupFilterCtrlGroupCache.get(entity.getGroupName());
+        if (keySet == null) {
+            ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+            keySet = groupFilterCtrlGroupCache.putIfAbsent(entity.getGroupName(), tmpSet);
+            if (keySet == null) {
+                keySet = tmpSet;
+            }
+        }
+        keySet.add(entity.getRecordKey());
+    }
+
+    private void clearCacheData() {
+        groupFilterCtrlTopicCache.clear();
+        groupFilterCtrlGroupCache.clear();
+        groupFilterCtrlCache.clear();
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
index 334c423..e96cf7e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
@@ -23,12 +23,15 @@ import com.sleepycat.persist.EntityCursor;
 import com.sleepycat.persist.EntityStore;
 import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
 import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
 import org.apache.tubemq.server.master.metastore.dao.mapper.TopicConfigMapper;
 import org.slf4j.Logger;
@@ -38,14 +41,20 @@ import org.slf4j.LoggerFactory;
 
 public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
 
-
     private static final Logger logger =
             LoggerFactory.getLogger(BdbTopicConfigMapperImpl.class);
 
-
     // Topic configure store
     private EntityStore topicConfStore;
     private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity> topicConfIndex;
+    // data cache
+    private ConcurrentHashMap<String/* recordKey */, TopicConfEntity> topicConfCache =
+            new ConcurrentHashMap<>();
+    private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
+            topicConfBrokerCacheIndex = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String> >
+            topicConfTopicNameCacheIndex = new ConcurrentHashMap<>();
+
 
 
     public BdbTopicConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
@@ -68,9 +77,8 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
     }
 
     @Override
-    public void loadConfig(ProcessResult result) throws LoadMetaException {
+    public void loadConfig() throws LoadMetaException {
         long count = 0L;
-        Map<String, TopicConfEntity> metaDataMap = new HashMap<>();
         EntityCursor<BdbTopicConfEntity> cursor = null;
         logger.info("[BDB Impl] load topic configure start...");
         try {
@@ -80,13 +88,11 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
                     logger.warn("[BDB Impl] found Null data while loading topic configure!");
                     continue;
                 }
-                TopicConfEntity memEntity =
-                        new TopicConfEntity(bdbEntity);
-                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                TopicConfEntity memEntity = new TopicConfEntity(bdbEntity);
+                addOrUpdCacheRecord(memEntity);
                 count++;
             }
             logger.info("[BDB Impl] total topic configure records are {}", count);
-            result.setSuccResult(metaDataMap);
         } catch (Exception e) {
             logger.error("[BDB Impl] load topic configure failure ", e);
             throw new LoadMetaException(e.getMessage());
@@ -98,6 +104,77 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
         logger.info("[BDB Impl] load topic configure successfully...");
     }
 
+    @Override
+    public boolean addTopicConf(TopicConfEntity memEntity, ProcessResult result) {
+        TopicConfEntity curEntity =
+                topicConfCache.get(memEntity.getRecordKey());
+        if (curEntity != null) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The topic configure ").append(memEntity.getRecordKey())
+                            .append("'s configure already exists, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putTopicConfig2Bdb(memEntity, result)) {
+            addOrUpdCacheRecord(memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean updTopicConf(TopicConfEntity memEntity, ProcessResult result) {
+        TopicConfEntity curEntity =
+                topicConfCache.get(memEntity.getRecordKey());
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The topic configure ").append(memEntity.getRecordKey())
+                            .append("'s configure is not exists, please add record first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (curEntity.equals(memEntity)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The topic configure ").append(memEntity.getRecordKey())
+                            .append("'s configure have not changed, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putTopicConfig2Bdb(memEntity, result)) {
+            addOrUpdCacheRecord(memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delTopicConf(String recordKey) {
+        TopicConfEntity curEntity =
+                topicConfCache.get(recordKey);
+        if (curEntity == null) {
+            return true;
+        }
+        delTopicConfigFromBdb(recordKey);
+        delCacheRecord(recordKey);
+        return true;
+    }
+
+    @Override
+    public List<TopicConfEntity> getTopicConf(TopicConfEntity qryEntity) {
+        List<TopicConfEntity> retEntitys = new ArrayList<>();
+        if (qryEntity == null) {
+            retEntitys.addAll(topicConfCache.values());
+        } else {
+            for (TopicConfEntity entity : topicConfCache.values()) {
+                if (entity.isMatched(qryEntity)) {
+                    retEntitys.add(entity);
+                }
+            }
+        }
+        return retEntitys;
+    }
+
     /**
      * Put topic configure info into bdb store
      *
@@ -105,8 +182,7 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
      * @param result process result with old value
      * @return
      */
-    @Override
-    public boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult result) {
+    private boolean putTopicConfig2Bdb(TopicConfEntity memEntity, ProcessResult result) {
         BdbTopicConfEntity retData = null;
         BdbTopicConfEntity bdbEntity =
                 memEntity.buildBdbTopicConfEntity();
@@ -114,17 +190,17 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
             retData = topicConfIndex.put(bdbEntity);
         } catch (Throwable e) {
             logger.error("[BDB Impl] put topic configure failure ", e);
-            result.setFailResult(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
-                    .append("Put topic configure failure: ")
-                    .append(e.getMessage()).toString());
+            result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("Put topic configure failure: ")
+                            .append(e.getMessage()).toString());
             return result.isSuccess();
         }
         result.setSuccResult(retData == null);
         return result.isSuccess();
     }
 
-    @Override
-    public boolean delTopicConfig(String recordKey) {
+    private boolean delTopicConfigFromBdb(String recordKey) {
         try {
             topicConfIndex.delete(recordKey);
         } catch (Throwable e) {
@@ -134,4 +210,58 @@ public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
         return true;
     }
 
+    private void delCacheRecord(String recordKey) {
+        TopicConfEntity curEntity = topicConfCache.get(recordKey);
+        if (curEntity == null) {
+            return;
+        }
+        // add topic index
+        ConcurrentHashSet<String> keySet =
+                topicConfTopicNameCacheIndex.get(curEntity.getTopicName());
+        if (keySet != null) {
+            keySet.remove(recordKey);
+            if (keySet.isEmpty()) {
+                topicConfTopicNameCacheIndex.remove(curEntity.getTopicName());
+            }
+        }
+        // delete brokerId index
+        keySet = topicConfBrokerCacheIndex.remove(curEntity.getBrokerId());
+        if (keySet != null) {
+            keySet.remove(recordKey);
+            if (keySet.isEmpty()) {
+                topicConfBrokerCacheIndex.remove(curEntity.getBrokerId());
+            }
+        }
+    }
+
+    private void addOrUpdCacheRecord(TopicConfEntity entity) {
+        topicConfCache.put(entity.getRecordKey(), entity);
+        // add topic index map
+        ConcurrentHashSet<String> keySet =
+                topicConfTopicNameCacheIndex.get(entity.getTopicName());
+        if (keySet == null) {
+            ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+            keySet = topicConfTopicNameCacheIndex.putIfAbsent(entity.getTopicName(), tmpSet);
+            if (keySet == null) {
+                keySet = tmpSet;
+            }
+        }
+        keySet.add(entity.getRecordKey());
+        // add brokerId index map
+        keySet = topicConfBrokerCacheIndex.get(entity.getBrokerId());
+        if (keySet == null) {
+            ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
+            keySet = topicConfBrokerCacheIndex.putIfAbsent(entity.getBrokerId(), tmpSet);
+            if (keySet == null) {
+                keySet = tmpSet;
+            }
+        }
+        keySet.add(entity.getRecordKey());
+    }
+
+    private void clearCacheData() {
+        topicConfTopicNameCacheIndex.clear();
+        topicConfBrokerCacheIndex.clear();
+        topicConfCache.clear();
+    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
index 9050938..f535dac 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -23,12 +23,15 @@ import com.sleepycat.persist.EntityCursor;
 import com.sleepycat.persist.EntityStore;
 import com.sleepycat.persist.PrimaryIndex;
 import com.sleepycat.persist.StoreConfig;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.server.common.exception.LoadMetaException;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
 import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
 import org.apache.tubemq.server.master.metastore.dao.mapper.TopicCtrlMapper;
 import org.slf4j.Logger;
@@ -45,8 +48,10 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
 
     // Topic control store
     private EntityStore topicCtrlStore;
-    private PrimaryIndex<String/* recordKey */, BdbTopicAuthControlEntity> topicCtrlIndex;
-
+    private PrimaryIndex<String/* topicName */, BdbTopicAuthControlEntity> topicCtrlIndex;
+    // data cache
+    private ConcurrentHashMap<String/* topicName */, TopicCtrlEntity> topicCtrlCache =
+            new ConcurrentHashMap<>();
 
     public BdbTopicCtrlMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
         topicCtrlStore = new EntityStore(repEnv,
@@ -57,6 +62,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
 
     @Override
     public void close() {
+        topicCtrlCache.clear();
         if (topicCtrlStore != null) {
             try {
                 topicCtrlStore.close();
@@ -68,25 +74,23 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
     }
 
     @Override
-    public void loadConfig(ProcessResult result) throws LoadMetaException {
+    public void loadConfig() throws LoadMetaException {
         long count = 0L;
-        Map<String, TopicCtrlEntity> metaDataMap = new HashMap<>();
         EntityCursor<BdbTopicAuthControlEntity> cursor = null;
         logger.info("[BDB Impl] load topic configure start...");
         try {
+            topicCtrlCache.clear();
             cursor = topicCtrlIndex.entities();
             for (BdbTopicAuthControlEntity bdbEntity : cursor) {
                 if (bdbEntity == null) {
                     logger.warn("[BDB Impl] found Null data while loading topic control!");
                     continue;
                 }
-                TopicCtrlEntity memEntity =
-                        new TopicCtrlEntity(bdbEntity);
-                metaDataMap.put(memEntity.getTopicName(), memEntity);
+                TopicCtrlEntity memEntity = new TopicCtrlEntity(bdbEntity);
+                topicCtrlCache.put(memEntity.getTopicName(), memEntity);
                 count++;
             }
             logger.info("[BDB Impl] total topic control records are {}", count);
-            result.setSuccResult(metaDataMap);
         } catch (Exception e) {
             logger.error("[BDB Impl] load topic control failure ", e);
             throw new LoadMetaException(e.getMessage());
@@ -98,6 +102,82 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
         logger.info("[BDB Impl] load topic control successfully...");
     }
 
+    @Override
+    public boolean addTopicCtrlConf(TopicCtrlEntity memEntity, ProcessResult result) {
+        TopicCtrlEntity curEntity =
+                topicCtrlCache.get(memEntity.getTopicName());
+        if (curEntity != null) {
+            result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The topic control ").append(memEntity.getTopicName())
+                            .append("'s configure already exists, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putTopicCtrlConfig2Bdb(memEntity, result)) {
+            topicCtrlCache.put(memEntity.getTopicName(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean updTopicCtrlConf(TopicCtrlEntity memEntity, ProcessResult result) {
+        TopicCtrlEntity curEntity =
+                topicCtrlCache.get(memEntity.getTopicName());
+        if (curEntity == null) {
+            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The topic control ").append(memEntity.getTopicName())
+                            .append("'s configure is not exists, please add record first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (curEntity.equals(memEntity)) {
+            result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+                    new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("The topic control ").append(memEntity.getTopicName())
+                            .append("'s configure have not changed, please delete it first!")
+                            .toString());
+            return result.isSuccess();
+        }
+        if (putTopicCtrlConfig2Bdb(memEntity, result)) {
+            topicCtrlCache.put(memEntity.getTopicName(), memEntity);
+        }
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delTopicCtrlConf(String topicName) {
+        TopicCtrlEntity curEntity =
+                topicCtrlCache.get(topicName);
+        if (curEntity == null) {
+            return true;
+        }
+        delTopicCtrlConfigFromBdb(topicName);
+        topicCtrlCache.remove(topicName);
+        return true;
+    }
+
+    @Override
+    public TopicCtrlEntity getTopicCtrlConf(String topicName) {
+        return topicCtrlCache.get(topicName);
+    }
+
+    @Override
+    public List<TopicCtrlEntity> getTopicCtrlConf(TopicConfEntity qryEntity) {
+        List<TopicCtrlEntity> retEntitys = new ArrayList<>();
+        if (qryEntity == null) {
+            retEntitys.addAll(topicCtrlCache.values());
+        } else {
+            for (TopicCtrlEntity entity : topicCtrlCache.values()) {
+                if (entity.isMatched(qryEntity)) {
+                    retEntitys.add(entity);
+                }
+            }
+        }
+        return retEntitys;
+    }
+
     /**
      * Put topic control configure info into bdb store
      *
@@ -105,8 +185,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
      * @param result process result with old value
      * @return
      */
-    @Override
-    public boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult result) {
+    private boolean putTopicCtrlConfig2Bdb(TopicCtrlEntity memEntity, ProcessResult result) {
         BdbTopicAuthControlEntity retData = null;
         BdbTopicAuthControlEntity bdbEntity =
                 memEntity.buildBdbTopicAuthControlEntity();
@@ -123,8 +202,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
         return result.isSuccess();
     }
 
-    @Override
-    public boolean delTopicCtrlConfig(String recordKey) {
+    private boolean delTopicCtrlConfigFromBdb(String recordKey) {
         try {
             topicCtrlIndex.delete(recordKey);
         } catch (Throwable e) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/AliveObserver.java
similarity index 70%
copy from tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
copy to tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/AliveObserver.java
index 7deadef..4c2e2ea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/AliveObserver.java
@@ -15,17 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.server.master.metastore.dao.mapper;
 
-import org.apache.tubemq.server.common.exception.LoadMetaException;
-import org.apache.tubemq.server.common.utils.ProcessResult;
+package org.apache.tubemq.server.master.metastore.keepalive;
 
+public interface AliveObserver {
 
-public interface AbstractMapper {
-
-    void close();
-
-    void loadConfig(ProcessResult result) throws LoadMetaException;
+    void clearCacheData();
 
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/KeepAlive.java
similarity index 60%
copy from tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
copy to tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/KeepAlive.java
index c42ea45..718c4dd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/keepalive/KeepAlive.java
@@ -15,18 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.server.master.metastore.dao.mapper;
+package org.apache.tubemq.server.master.metastore.keepalive;
 
-import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
+import java.net.InetSocketAddress;
+import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
 
 
 
-public interface TopicCtrlMapper extends AbstractMapper {
+public interface KeepAlive {
 
-    boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult result);
+    boolean isMasterNow();
 
-    boolean delTopicCtrlConfig(String key);
+    long getMasterSinceTime();
 
+    InetSocketAddress getMasterAddress();
 
+    boolean isPrimaryNodeActive();
+
+    void transferMaster() throws Exception;
+
+    void registerObserver(AliveObserver eventObserver);
+
+    MasterGroupStatus getMasterGroupStatus(boolean isFromHeartbeat);
 }