You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:26 UTC

[incubator-tubemq] 28/49: [TUBEMQ-499] Add configure store

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit f5ae304281370ac17968fc4fe6e20d0c20f3b47d
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Jan 8 18:41:23 2021 +0800

    [TUBEMQ-499] Add configure store
---
 .../apache/tubemq/corebase/utils/MixedUtils.java   |   9 +
 tubemq-core/src/main/proto/MasterService.proto     |  18 ++
 .../tubemq/server/common/TServerConstants.java     |   2 +
 .../server/master/bdbstore/BdbStoreService.java    |   7 +
 .../master/bdbstore/DefaultBdbStoreService.java    |  95 +++++++
 .../bdbentitys/BdbClusterSettingEntity.java        | 309 +++++++++++++++++++++
 .../nodemanage/nodebroker/BrokerConfManager.java   |  95 +++++++
 7 files changed, 535 insertions(+)

diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
index bfbedad..bcd0738 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
@@ -92,4 +92,13 @@ public class MixedUtils {
         dataBuffer.flip();
         return dataBuffer.array();
     }
+
+    // get the middle data between min, max, and data
+    public static int mid(int data, int min, int max) {
+        return Math.max(min, Math.min(max, data));
+    }
+
+    public static long mid(long data, long min, long max) {
+        return Math.max(min, Math.min(max, data));
+    }
 }
diff --git a/tubemq-core/src/main/proto/MasterService.proto b/tubemq-core/src/main/proto/MasterService.proto
index cefd2d2..27e5496 100644
--- a/tubemq-core/src/main/proto/MasterService.proto
+++ b/tubemq-core/src/main/proto/MasterService.proto
@@ -61,6 +61,16 @@ message MasterBrokerAuthorizedInfo {
     optional string authAuthorizedToken = 2;
 }
 
+message ApprovedClientConfig {
+    required int64 configId = 1;
+    optional int32 maxMsgSize = 2;
+}
+
+message ClusterDefConfig {
+    required int64 configId = 1;
+    optional int32 maxMsgSize = 2;
+}
+
 message RegisterRequestP2M {
     required string clientId = 1;
     repeated string topicList = 2;
@@ -68,6 +78,7 @@ message RegisterRequestP2M {
     required string hostName = 4;
     optional MasterCertificateInfo authInfo = 5;
     optional string jdkVersion = 6;
+    optional ApprovedClientConfig appdConfig = 7;
 }
 
 message RegisterResponseM2P {
@@ -77,6 +88,7 @@ message RegisterResponseM2P {
     required int64 brokerCheckSum = 4;
     repeated string brokerInfos = 5;
     optional MasterAuthorizedInfo authorizedInfo = 6;
+    optional ApprovedClientConfig appdConfig = 7;
 }
 
 message HeartRequestP2M {
@@ -85,6 +97,7 @@ message HeartRequestP2M {
     required string hostName = 3;
     repeated string topicList = 4;
     optional MasterCertificateInfo authInfo = 5;
+    optional ApprovedClientConfig appdConfig = 6;
 }
 
 message HeartResponseM2P {
@@ -97,6 +110,7 @@ message HeartResponseM2P {
     repeated string brokerInfos = 6;
     optional bool requireAuth = 7;
     optional MasterAuthorizedInfo authorizedInfo = 8;
+    optional ApprovedClientConfig appdConfig = 9;
 }
 
 message CloseRequestP2M{
@@ -208,6 +222,7 @@ message RegisterRequestB2M {
     optional int32 qryPriorityId = 12;
     optional int32 tlsPort = 13;
     optional MasterCertificateInfo authInfo = 14;
+    optional ClusterDefConfig clsDefConfig = 15;
 }
 
 message RegisterResponseM2B {
@@ -230,6 +245,7 @@ message RegisterResponseM2B {
     optional int32 qryPriorityId = 15;
     optional MasterAuthorizedInfo authorizedInfo = 16; /* Deprecated  */
     optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 17;
+    optional ClusterDefConfig clsDefConfig = 18;
 }
 
 message HeartRequestB2M {
@@ -250,6 +266,7 @@ message HeartRequestB2M {
     optional int64 flowCheckId = 13;
     optional int32 qryPriorityId = 14;
     optional MasterCertificateInfo authInfo = 15;
+    optional ClusterDefConfig clsDefConfig = 16;
 }
 
 message HeartResponseM2B {
@@ -275,6 +292,7 @@ message HeartResponseM2B {
     optional int32 qryPriorityId = 17;
     optional MasterAuthorizedInfo authorizedInfo = 18;   /* Deprecated  */
     optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 19;
+    optional ClusterDefConfig clsDefConfig = 20;
 }
 
 message CloseRequestB2M {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
index 12af51a..5793364 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
@@ -25,6 +25,8 @@ public final class TServerConstants {
     public static final String TOKEN_JOB_TOPICS = "topics";
     public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager";
     public static final String TOKEN_DEFAULT_FLOW_CONTROL = "default_master_ctrl";
+    public static final String TOKEN_DEFAULT_CLUSTER_SETTING = "default_cluster_config";
+
     public static final String TOKEN_BLANK_FILTER_CONDITION = ",,";
 
     public static final int CFG_MODAUTHTOKEN_MAX_LENGTH = 128;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java
index 7b264c7..4cb2185 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -103,4 +104,10 @@ public interface BdbStoreService {
                                             boolean isNew);
 
     ConcurrentHashMap<String, BdbConsumeGroupSettingEntity> getConsumeGroupSettingMap();
+
+    boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew);
+
+    boolean delBdbClusterConfEntity();
+
+    ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap();
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index b201cde..1b8a1b1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -55,11 +55,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.Server;
+import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
 import org.apache.tubemq.server.master.MasterConfig;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -72,6 +74,7 @@ import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Bdb store service
  * like a local database manager, according to database table name, store instance, primary key, memory cache
@@ -80,6 +83,7 @@ import org.slf4j.LoggerFactory;
 public class DefaultBdbStoreService implements BdbStoreService, Server {
     private static final Logger logger = LoggerFactory.getLogger(DefaultBdbStoreService.class);
 
+    private static final String BDB_CLUSTER_SETTING_STORE_NAME = "bdbClusterSetting";
     private static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig";
     private static final String BDB_BROKER_CONFIG_STORE_NAME = "bdbBrokerConfig";
     private static final String BDB_CONSUMER_GROUP_STORE_NAME = "bdbConsumerGroup";
@@ -152,6 +156,11 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
     private PrimaryIndex<String/* recordKey */, BdbConsumeGroupSettingEntity> consumeGroupSettingIndex;
     private ConcurrentHashMap<String/* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap =
             new ConcurrentHashMap<>();
+    // cluster default setting store
+    private EntityStore clusterDefSettingStore;
+    private PrimaryIndex<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingIndex;
+    private ConcurrentHashMap<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingMap =
+            new ConcurrentHashMap<>();
     // service status
     private AtomicBoolean isStarted = new AtomicBoolean(false);
     // master role flag
@@ -386,6 +395,14 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
                 logger.error("[BDB Error] Close groupFlowCtrlStore error ", e);
             }
         }
+        if (clusterDefSettingStore != null) {
+            try {
+                clusterDefSettingStore.close();
+                clusterDefSettingStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Error] Close clusterDefSettingStore error ", e);
+            }
+        }
         /* evn close */
         if (repEnv != null) {
             try {
@@ -769,6 +786,39 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
         return true;
     }
 
+    /**
+     * Put cluster default setting bdb entity
+     *
+     * @param clusterConfEntity
+     * @param isNew
+     * @return
+     */
+    @Override
+    public boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew) {
+        BdbClusterSettingEntity result = null;
+        try {
+            result = clusterDefSettingIndex.put(clusterConfEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Error] Put ClusterConfEntity Error ", e);
+            return false;
+        }
+        if (isNew) {
+            return result == null;
+        }
+        return result != null;
+    }
+
+    @Override
+    public boolean delBdbClusterConfEntity() {
+        try {
+            clusterDefSettingIndex.delete(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+        } catch (Throwable e) {
+            logger.error("[BDB Error] delBdbClusterConfEntity Error ", e);
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public ConcurrentHashMap<String,
             ConcurrentHashMap<String, BdbConsumerGroupEntity>> getConsumerGroupNameAccControlMap() {
@@ -797,6 +847,11 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
         return this.consumeGroupSettingMap;
     }
 
+    @Override
+    public ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap() {
+        return this.clusterDefSettingMap;
+    }
+
     /**
      * Get master group status
      *
@@ -977,6 +1032,10 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
                 new EntityStore(repEnv, BDB_CONSUME_GROUP_SETTING_STORE_NAME, storeConfig);
         consumeGroupSettingIndex =
                 consumeGroupSettingStore.getPrimaryIndex(String.class, BdbConsumeGroupSettingEntity.class);
+        clusterDefSettingStore =
+                new EntityStore(repEnv, BDB_CLUSTER_SETTING_STORE_NAME, storeConfig);
+        clusterDefSettingIndex =
+                clusterDefSettingStore.getPrimaryIndex(String.class, BdbClusterSettingEntity.class);
     }
 
     /**
@@ -1394,6 +1453,41 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
         logger.info("loadConsumeGroupSettingUnits successfully...");
     }
 
+
+    private void loadClusterDefSettingUnits() throws Exception {
+        long count = 0L;
+        EntityCursor<BdbClusterSettingEntity> cursor = null;
+        logger.info("loadClusterDefSettingUnits start...");
+        try {
+            cursor = clusterDefSettingIndex.entities();
+            clusterDefSettingMap.clear();
+            StringBuilder sBuilder = logger.isDebugEnabled() ? new StringBuilder(512) : null;
+            logger.debug("[loadClusterDefSettingUnits] Load consumer group begin:");
+            for (BdbClusterSettingEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Error] Found Null data while loading from clusterDefSettingIndex!");
+                    continue;
+                }
+                clusterDefSettingMap.put(bdbEntity.getRecordKey(), bdbEntity);
+                count++;
+                if (logger.isDebugEnabled()) {
+                    logger.debug(bdbEntity.toJsonString(sBuilder).toString());
+                    sBuilder.delete(0, sBuilder.length());
+                }
+            }
+            logger.debug("[loadClusterDefSettingUnits] Load consumer group finished!");
+            logger.info("[loadClusterDefSettingUnits] total load records are {}", count);
+        } catch (Exception e) {
+            logger.error("[loadClusterDefSettingUnits error] ", e);
+            throw e;
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("loadClusterDefSettingUnits successfully...");
+    }
+
     public class Listener implements StateChangeListener {
         @Override
         public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException {
@@ -1424,6 +1518,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
                             if (!isMaster) {
                                 try {
                                     clearCachedRunData();
+                                    loadClusterDefSettingUnits();
                                     loadBrokerConfUnits();
                                     loadTopicConfUnits();
                                     loadGroupFlowCtrlUnits();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
new file mode 100644
index 0000000..ca6e1b4
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.bdbstore.bdbentitys;
+
+import com.sleepycat.persist.model.Entity;
+import com.sleepycat.persist.model.PrimaryKey;
+import java.io.Serializable;
+import java.util.Date;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+
+
+/*
+ * store the cluster default setting
+ *
+ */
+@Entity
+public class BdbClusterSettingEntity implements Serializable {
+
+    private static final long serialVersionUID = -3259439355290322115L;
+
+    @PrimaryKey
+    private String recordKey = "";
+    //broker tcp port
+    private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
+    //broker tls port
+    private int brokerTLSPort = TBaseConstants.META_VALUE_UNDEFINED;
+    //broker web port
+    private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED;
+    //store num
+    private int numTopicStores = TBaseConstants.META_VALUE_UNDEFINED;
+    //partition num
+    private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush disk threshold
+    private int unflushDskThreshold = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush disk interval
+    private int unflushDksInterval = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush memory cache threshold
+    private int unflushMemThreshold = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush memory cache interval
+    private int unflushMemInterval = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush memory cache count
+    private int unflushMemCnt = TBaseConstants.META_VALUE_UNDEFINED;
+    private boolean acceptPublish = true;   //enable publish
+    private boolean acceptSubscribe = true; //enable subscribe
+    private String deleteWhen = "";              //delete policy execute time
+    private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
+    private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED;
+    private String attributes;               //extra attribute
+    private String modifyUser;               //modify user
+    private Date modifyDate;                 //modify date
+
+    public BdbClusterSettingEntity() {
+    }
+
+    //Constructor
+    public BdbClusterSettingEntity(String recordKey, int brokerPort, int brokerTLSPort,
+                                   int brokerWebPort, int numTopicStores, int numPartitions,
+                                   int unflushDskThreshold, int unflushDksInterval,
+                                   int unflushMemThreshold, int unflushMemInterval,
+                                   int unflushMemCnt, boolean acceptPublish,
+                                   boolean acceptSubscribe, String deleteWhen,
+                                   int qryPriorityId, int maxMsgSize, String attributes,
+                                   String modifyUser, Date modifyDate) {
+        this.recordKey = recordKey;
+        this.brokerPort = brokerPort;
+        this.brokerTLSPort = brokerTLSPort;
+        this.brokerWebPort = brokerWebPort;
+        this.numTopicStores = numTopicStores;
+        this.numPartitions = numPartitions;
+        this.unflushDskThreshold = unflushDskThreshold;
+        this.unflushDksInterval = unflushDksInterval;
+        this.unflushMemThreshold = unflushMemThreshold;
+        this.unflushMemInterval = unflushMemInterval;
+        this.unflushMemCnt = unflushMemCnt;
+        this.acceptPublish = acceptPublish;
+        this.acceptSubscribe = acceptSubscribe;
+        this.deleteWhen = deleteWhen;
+        this.qryPriorityId = qryPriorityId;
+        this.maxMsgSize = maxMsgSize;
+        this.attributes = attributes;
+        this.modifyUser = modifyUser;
+        this.modifyDate = modifyDate;
+    }
+
+    public void setRecordKey(String recordKey) {
+        this.recordKey = recordKey;
+    }
+
+    public String getRecordKey() {
+        return recordKey;
+    }
+
+    public int getBrokerPort() {
+        return brokerPort;
+    }
+
+    public void setBrokerPort(int brokerPort) {
+        this.brokerPort = brokerPort;
+    }
+
+    public int getBrokerTLSPort() {
+        return brokerTLSPort;
+    }
+
+    public void setBrokerTLSPort(int brokerTLSPort) {
+        this.brokerTLSPort = brokerTLSPort;
+    }
+
+    public int getBrokerWebPort() {
+        return brokerWebPort;
+    }
+
+    public void setBrokerWebPort(int brokerWebPort) {
+        this.brokerWebPort = brokerWebPort;
+    }
+
+    public int getNumTopicStores() {
+        return numTopicStores;
+    }
+
+    public void setNumTopicStores(int numTopicStores) {
+        this.numTopicStores = numTopicStores;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
+
+    public void setNumPartitions(int numPartitions) {
+        this.numPartitions = numPartitions;
+    }
+
+    public int getUnflushDskThreshold() {
+        return unflushDskThreshold;
+    }
+
+    public void setUnflushDskThreshold(int unflushDskThreshold) {
+        this.unflushDskThreshold = unflushDskThreshold;
+    }
+
+    public int getUnflushDksInterval() {
+        return unflushDksInterval;
+    }
+
+    public void setUnflushDksInterval(int unflushDksInterval) {
+        this.unflushDksInterval = unflushDksInterval;
+    }
+
+    public int getUnflushMemThreshold() {
+        return unflushMemThreshold;
+    }
+
+    public void setUnflushMemThreshold(int unflushMemThreshold) {
+        this.unflushMemThreshold = unflushMemThreshold;
+    }
+
+    public int getUnflushMemInterval() {
+        return unflushMemInterval;
+    }
+
+    public void setUnflushMemInterval(int unflushMemInterval) {
+        this.unflushMemInterval = unflushMemInterval;
+    }
+
+    public int getUnflushMemCnt() {
+        return unflushMemCnt;
+    }
+
+    public void setUnflushMemCnt(int unflushMemCnt) {
+        this.unflushMemCnt = unflushMemCnt;
+    }
+
+    public boolean isAcceptPublish() {
+        return acceptPublish;
+    }
+
+    public void setAcceptPublish(boolean acceptPublish) {
+        this.acceptPublish = acceptPublish;
+    }
+
+    public boolean isAcceptSubscribe() {
+        return acceptSubscribe;
+    }
+
+    public void setAcceptSubscribe(boolean acceptSubscribe) {
+        this.acceptSubscribe = acceptSubscribe;
+    }
+
+    public String getDeleteWhen() {
+        return deleteWhen;
+    }
+
+    public void setDeleteWhen(String deleteWhen) {
+        this.deleteWhen = deleteWhen;
+    }
+
+    public int getQryPriorityId() {
+        return qryPriorityId;
+    }
+
+    public void setQryPriorityId(int qryPriorityId) {
+        this.qryPriorityId = qryPriorityId;
+    }
+
+    public int getMaxMsgSize() {
+        return maxMsgSize;
+    }
+
+    public void setMaxMsgSize(int maxMsgSize) {
+        this.maxMsgSize = maxMsgSize;
+    }
+
+    public String getAttributes() {
+        return attributes;
+    }
+
+    public void setAttributes(String attributes) {
+        this.attributes = attributes;
+    }
+
+    public String getModifyUser() {
+        return modifyUser;
+    }
+
+    public void setModifyUser(String modifyUser) {
+        this.modifyUser = modifyUser;
+    }
+
+    public Date getModifyDate() {
+        return modifyDate;
+    }
+
+    public void setModifyDate(Date modifyDate) {
+        this.modifyDate = modifyDate;
+    }
+
+    /**
+     * Serialize field to json format
+     *
+     * @param sBuilder
+     * @return
+     */
+    public StringBuilder toJsonString(final StringBuilder sBuilder) {
+        return sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",")
+                .append("\"recordKey\":\"").append(recordKey).append("\"")
+                .append(",\"brokerPort\":").append(brokerPort)
+                .append(",\"brokerTLSPort\":").append(brokerTLSPort)
+                .append(",\"brokerWebPort\":").append(brokerWebPort)
+                .append(",\"numTopicStores\":").append(numTopicStores)
+                .append(",\"numPartitions\":").append(numPartitions)
+                .append(",\"unflushDskThreshold\":").append(unflushDskThreshold)
+                .append(",\"unflushDksInterval\":").append(unflushDksInterval)
+                .append(",\"unflushMemThreshold\":").append(unflushMemThreshold)
+                .append(",\"unflushMemInterval\":").append(unflushMemInterval)
+                .append(",\"unflushMemCnt\":").append(unflushMemCnt)
+                .append(",\"acceptPublish\":").append(acceptPublish)
+                .append(",\"acceptSubscribe\":").append(acceptSubscribe)
+                .append(",\"deleteWhen\":\"").append(deleteWhen).append("\"")
+                .append(",\"maxMsgSize\":").append(maxMsgSize)
+                .append(",\"qryPriorityId\":").append(qryPriorityId)
+                .append(",\"attributes\":\"").append(attributes).append("\"")
+                .append(",\"modifyUser\":\"").append(modifyUser).append("\"")
+                .append(",\"modifyDate\":\"")
+                .append(WebParameterUtils.date2yyyyMMddHHmmss(modifyDate))
+                .append("\"}");
+    }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this)
+                .append("recordKey", recordKey)
+                .append("brokerPort", brokerPort)
+                .append("brokerTLSPort", brokerTLSPort)
+                .append("brokerWebPort", brokerWebPort)
+                .append("numTopicStores", numTopicStores)
+                .append("numPartitions", numPartitions)
+                .append("unflushDskThreshold", unflushDskThreshold)
+                .append("unflushDksInterval", unflushDksInterval)
+                .append("unflushMemThreshold", unflushMemThreshold)
+                .append("unflushMemInterval", unflushMemInterval)
+                .append("unflushMemCnt", unflushMemCnt)
+                .append("acceptPublish", acceptPublish)
+                .append("acceptSubscribe", acceptSubscribe)
+                .append("deleteWhen", deleteWhen)
+                .append("maxMsgSize", maxMsgSize)
+                .append("qryPriorityId", qryPriorityId)
+                .append("attributes", attributes)
+                .append("modifyUser", modifyUser)
+                .append("modifyDate", modifyDate)
+                .toString();
+    }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 698f0d3..86a6bc4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -46,6 +46,7 @@ import org.apache.tubemq.server.master.bdbstore.DefaultBdbStoreService;
 import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -90,6 +91,7 @@ public class BrokerConfManager implements Server {
             ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>> groupFilterCondTopicMap;
     private ConcurrentHashMap<String /* groupName */, BdbGroupFlowCtrlEntity> consumeGroupFlowCtrlMap;
     private ConcurrentHashMap<String /* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap;
+    private ConcurrentHashMap<String /* recordKey */, BdbClusterSettingEntity> clusterSettingMap;
     private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
     private long lastBrokerUpdatedTime = System.currentTimeMillis();
     private long serviceStartTime = System.currentTimeMillis();
@@ -98,6 +100,8 @@ public class BrokerConfManager implements Server {
     public BrokerConfManager(DefaultBdbStoreService mBdbStoreManagerService) {
         this.mBdbStoreManagerService = mBdbStoreManagerService;
         this.replicationConfig = mBdbStoreManagerService.getReplicationConfig();
+        this.clusterSettingMap =
+                this.mBdbStoreManagerService.getClusterDefSettingMap();
         this.brokerConfStoreMap = this.mBdbStoreManagerService.getBrokerConfigMap();
         for (BdbBrokerConfEntity entity : this.brokerConfStoreMap.values()) {
             updateBrokerMaps(entity);
@@ -2013,6 +2017,97 @@ public class BrokerConfManager implements Server {
         return true;
     }
 
+    // /////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * Add cluster default setting
+     *
+     * @param bdbEntity the cluster default setting entity will be add
+     * @return true if success otherwise false
+     * @throws Exception
+     */
+    public boolean confAddBdbClusterDefSetting(BdbClusterSettingEntity bdbEntity)
+            throws Exception {
+        validMasterStatus();
+        BdbClusterSettingEntity curEntity =
+                clusterSettingMap.get(bdbEntity.getRecordKey());
+        if (curEntity != null) {
+            throw new Exception(new StringBuilder(512)
+                    .append("Duplicate add ClusterSetting info, exist record is: ")
+                    .append(curEntity).toString());
+        }
+        boolean putResult =
+                mBdbStoreManagerService.putBdbClusterConfEntity(bdbEntity, true);
+        if (putResult) {
+            clusterSettingMap.put(bdbEntity.getRecordKey(), bdbEntity);
+            logger.info(new StringBuilder(512)
+                    .append("[ClusterSetting Success] ")
+                    .append(bdbEntity).toString());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * update cluster default setting
+     *
+     * @param bdbEntity the cluster setting entity will be set
+     * @return true if success otherwise false
+     * @throws Exception
+     */
+    public boolean confUpdBdbClusterSetting(BdbClusterSettingEntity bdbEntity)
+            throws Exception {
+        validMasterStatus();
+        StringBuilder strBuffer = new StringBuilder(512);
+        BdbClusterSettingEntity curDefSettingEntity =
+                clusterSettingMap.get(bdbEntity.getRecordKey());
+        if (curDefSettingEntity == null) {
+            throw new Exception(strBuffer
+                    .append("Update ClusterSetting failure, not exist record for record: ")
+                    .append(bdbEntity.getRecordKey()).toString());
+        }
+        boolean putResult =
+                mBdbStoreManagerService.putBdbClusterConfEntity(bdbEntity, false);
+        if (putResult) {
+            clusterSettingMap.put(bdbEntity.getRecordKey(), bdbEntity);
+            strBuffer.append("[confUpdBdbClusterSetting Success] record from : ");
+            strBuffer = curDefSettingEntity.toJsonString(strBuffer);
+            strBuffer.append(" to : ");
+            strBuffer = bdbEntity.toJsonString(strBuffer);
+            logger.info(strBuffer.toString());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Delete cluster default setting
+     *
+     * @param strBuffer    the error info string buffer
+     * @return true if success
+     * @throws Exception
+     */
+    public boolean confDeleteBdbClusterSetting(final StringBuilder strBuffer) throws Exception {
+        validMasterStatus();
+        BdbClusterSettingEntity curEntity =
+                this.clusterSettingMap.remove(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+        if (curEntity != null) {
+            mBdbStoreManagerService.delBdbClusterConfEntity();
+            strBuffer.append(
+                    "[confDeleteBdbClusterSetting  Success], deleted cluster setting record :");
+            logger.info(curEntity.toJsonString(strBuffer).toString());
+            strBuffer.delete(0, strBuffer.length());
+        } else {
+            logger.info("[confDeleteBdbClusterSetting  Success], not found record");
+        }
+        return true;
+    }
+
+    public BdbClusterSettingEntity getBdbClusterSetting() {
+        return this.clusterSettingMap.get(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+    }
+
+
     private void validMasterStatus() throws Exception {
         if (!isSelfMaster()) {
             throw new StandbyException("Please send your request to the master Node.");