You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:26 UTC
[incubator-tubemq] 28/49: [TUBEMQ-499] Add configure store
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit f5ae304281370ac17968fc4fe6e20d0c20f3b47d
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Jan 8 18:41:23 2021 +0800
[TUBEMQ-499] Add configure store
---
.../apache/tubemq/corebase/utils/MixedUtils.java | 9 +
tubemq-core/src/main/proto/MasterService.proto | 18 ++
.../tubemq/server/common/TServerConstants.java | 2 +
.../server/master/bdbstore/BdbStoreService.java | 7 +
.../master/bdbstore/DefaultBdbStoreService.java | 95 +++++++
.../bdbentitys/BdbClusterSettingEntity.java | 309 +++++++++++++++++++++
.../nodemanage/nodebroker/BrokerConfManager.java | 95 +++++++
7 files changed, 535 insertions(+)
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
index bfbedad..bcd0738 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
@@ -92,4 +92,13 @@ public class MixedUtils {
dataBuffer.flip();
return dataBuffer.array();
}
+
+ // get the middle data between min, max, and data
+ public static int mid(int data, int min, int max) {
+ return Math.max(min, Math.min(max, data));
+ }
+
+ public static long mid(long data, long min, long max) {
+ return Math.max(min, Math.min(max, data));
+ }
}
diff --git a/tubemq-core/src/main/proto/MasterService.proto b/tubemq-core/src/main/proto/MasterService.proto
index cefd2d2..27e5496 100644
--- a/tubemq-core/src/main/proto/MasterService.proto
+++ b/tubemq-core/src/main/proto/MasterService.proto
@@ -61,6 +61,16 @@ message MasterBrokerAuthorizedInfo {
optional string authAuthorizedToken = 2;
}
+message ApprovedClientConfig {
+ required int64 configId = 1;
+ optional int32 maxMsgSize = 2;
+}
+
+message ClusterDefConfig {
+ required int64 configId = 1;
+ optional int32 maxMsgSize = 2;
+}
+
message RegisterRequestP2M {
required string clientId = 1;
repeated string topicList = 2;
@@ -68,6 +78,7 @@ message RegisterRequestP2M {
required string hostName = 4;
optional MasterCertificateInfo authInfo = 5;
optional string jdkVersion = 6;
+ optional ApprovedClientConfig appdConfig = 7;
}
message RegisterResponseM2P {
@@ -77,6 +88,7 @@ message RegisterResponseM2P {
required int64 brokerCheckSum = 4;
repeated string brokerInfos = 5;
optional MasterAuthorizedInfo authorizedInfo = 6;
+ optional ApprovedClientConfig appdConfig = 7;
}
message HeartRequestP2M {
@@ -85,6 +97,7 @@ message HeartRequestP2M {
required string hostName = 3;
repeated string topicList = 4;
optional MasterCertificateInfo authInfo = 5;
+ optional ApprovedClientConfig appdConfig = 6;
}
message HeartResponseM2P {
@@ -97,6 +110,7 @@ message HeartResponseM2P {
repeated string brokerInfos = 6;
optional bool requireAuth = 7;
optional MasterAuthorizedInfo authorizedInfo = 8;
+ optional ApprovedClientConfig appdConfig = 9;
}
message CloseRequestP2M{
@@ -208,6 +222,7 @@ message RegisterRequestB2M {
optional int32 qryPriorityId = 12;
optional int32 tlsPort = 13;
optional MasterCertificateInfo authInfo = 14;
+ optional ClusterDefConfig clsDefConfig = 15;
}
message RegisterResponseM2B {
@@ -230,6 +245,7 @@ message RegisterResponseM2B {
optional int32 qryPriorityId = 15;
optional MasterAuthorizedInfo authorizedInfo = 16; /* Deprecated */
optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 17;
+ optional ClusterDefConfig clsDefConfig = 18;
}
message HeartRequestB2M {
@@ -250,6 +266,7 @@ message HeartRequestB2M {
optional int64 flowCheckId = 13;
optional int32 qryPriorityId = 14;
optional MasterCertificateInfo authInfo = 15;
+ optional ClusterDefConfig clsDefConfig = 16;
}
message HeartResponseM2B {
@@ -275,6 +292,7 @@ message HeartResponseM2B {
optional int32 qryPriorityId = 17;
optional MasterAuthorizedInfo authorizedInfo = 18; /* Deprecated */
optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 19;
+ optional ClusterDefConfig clsDefConfig = 20;
}
message CloseRequestB2M {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
index 12af51a..5793364 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
@@ -25,6 +25,8 @@ public final class TServerConstants {
public static final String TOKEN_JOB_TOPICS = "topics";
public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager";
public static final String TOKEN_DEFAULT_FLOW_CONTROL = "default_master_ctrl";
+ public static final String TOKEN_DEFAULT_CLUSTER_SETTING = "default_cluster_config";
+
public static final String TOKEN_BLANK_FILTER_CONDITION = ",,";
public static final int CFG_MODAUTHTOKEN_MAX_LENGTH = 128;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java
index 7b264c7..4cb2185 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -103,4 +104,10 @@ public interface BdbStoreService {
boolean isNew);
ConcurrentHashMap<String, BdbConsumeGroupSettingEntity> getConsumeGroupSettingMap();
+
+ boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew);
+
+ boolean delBdbClusterConfEntity();
+
+ ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index b201cde..1b8a1b1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -55,11 +55,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.Server;
+import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
import org.apache.tubemq.server.master.MasterConfig;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -72,6 +74,7 @@ import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Bdb store service
* like a local database manager, according to database table name, store instance, primary key, memory cache
@@ -80,6 +83,7 @@ import org.slf4j.LoggerFactory;
public class DefaultBdbStoreService implements BdbStoreService, Server {
private static final Logger logger = LoggerFactory.getLogger(DefaultBdbStoreService.class);
+ private static final String BDB_CLUSTER_SETTING_STORE_NAME = "bdbClusterSetting";
private static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig";
private static final String BDB_BROKER_CONFIG_STORE_NAME = "bdbBrokerConfig";
private static final String BDB_CONSUMER_GROUP_STORE_NAME = "bdbConsumerGroup";
@@ -152,6 +156,11 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
private PrimaryIndex<String/* recordKey */, BdbConsumeGroupSettingEntity> consumeGroupSettingIndex;
private ConcurrentHashMap<String/* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap =
new ConcurrentHashMap<>();
+ // cluster default setting store
+ private EntityStore clusterDefSettingStore;
+ private PrimaryIndex<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingIndex;
+ private ConcurrentHashMap<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingMap =
+ new ConcurrentHashMap<>();
// service status
private AtomicBoolean isStarted = new AtomicBoolean(false);
// master role flag
@@ -386,6 +395,14 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
logger.error("[BDB Error] Close groupFlowCtrlStore error ", e);
}
}
+ if (clusterDefSettingStore != null) {
+ try {
+ clusterDefSettingStore.close();
+ clusterDefSettingStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Error] Close clusterDefSettingStore error ", e);
+ }
+ }
/* evn close */
if (repEnv != null) {
try {
@@ -769,6 +786,39 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
return true;
}
+ /**
+ * Put cluster default setting bdb entity
+ *
+ * @param clusterConfEntity
+ * @param isNew
+ * @return
+ */
+ @Override
+ public boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew) {
+ BdbClusterSettingEntity result = null;
+ try {
+ result = clusterDefSettingIndex.put(clusterConfEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Error] Put ClusterConfEntity Error ", e);
+ return false;
+ }
+ if (isNew) {
+ return result == null;
+ }
+ return result != null;
+ }
+
+ @Override
+ public boolean delBdbClusterConfEntity() {
+ try {
+ clusterDefSettingIndex.delete(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ } catch (Throwable e) {
+ logger.error("[BDB Error] delBdbClusterConfEntity Error ", e);
+ return false;
+ }
+ return true;
+ }
+
@Override
public ConcurrentHashMap<String,
ConcurrentHashMap<String, BdbConsumerGroupEntity>> getConsumerGroupNameAccControlMap() {
@@ -797,6 +847,11 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
return this.consumeGroupSettingMap;
}
+ @Override
+ public ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap() {
+ return this.clusterDefSettingMap;
+ }
+
/**
* Get master group status
*
@@ -977,6 +1032,10 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
new EntityStore(repEnv, BDB_CONSUME_GROUP_SETTING_STORE_NAME, storeConfig);
consumeGroupSettingIndex =
consumeGroupSettingStore.getPrimaryIndex(String.class, BdbConsumeGroupSettingEntity.class);
+ clusterDefSettingStore =
+ new EntityStore(repEnv, BDB_CLUSTER_SETTING_STORE_NAME, storeConfig);
+ clusterDefSettingIndex =
+ clusterDefSettingStore.getPrimaryIndex(String.class, BdbClusterSettingEntity.class);
}
/**
@@ -1394,6 +1453,41 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
logger.info("loadConsumeGroupSettingUnits successfully...");
}
+
+ private void loadClusterDefSettingUnits() throws Exception {
+ long count = 0L;
+ EntityCursor<BdbClusterSettingEntity> cursor = null;
+ logger.info("loadClusterDefSettingUnits start...");
+ try {
+ cursor = clusterDefSettingIndex.entities();
+ clusterDefSettingMap.clear();
+ StringBuilder sBuilder = logger.isDebugEnabled() ? new StringBuilder(512) : null;
+ logger.debug("[loadClusterDefSettingUnits] Load consumer group begin:");
+ for (BdbClusterSettingEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Error] Found Null data while loading from clusterDefSettingIndex!");
+ continue;
+ }
+ clusterDefSettingMap.put(bdbEntity.getRecordKey(), bdbEntity);
+ count++;
+ if (logger.isDebugEnabled()) {
+ logger.debug(bdbEntity.toJsonString(sBuilder).toString());
+ sBuilder.delete(0, sBuilder.length());
+ }
+ }
+ logger.debug("[loadClusterDefSettingUnits] Load consumer group finished!");
+ logger.info("[loadClusterDefSettingUnits] total load records are {}", count);
+ } catch (Exception e) {
+ logger.error("[loadClusterDefSettingUnits error] ", e);
+ throw e;
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("loadClusterDefSettingUnits successfully...");
+ }
+
public class Listener implements StateChangeListener {
@Override
public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException {
@@ -1424,6 +1518,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
if (!isMaster) {
try {
clearCachedRunData();
+ loadClusterDefSettingUnits();
loadBrokerConfUnits();
loadTopicConfUnits();
loadGroupFlowCtrlUnits();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
new file mode 100644
index 0000000..ca6e1b4
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.bdbstore.bdbentitys;
+
+import com.sleepycat.persist.model.Entity;
+import com.sleepycat.persist.model.PrimaryKey;
+import java.io.Serializable;
+import java.util.Date;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+
+
+/*
+ * store the cluster default setting
+ *
+ */
+@Entity
+public class BdbClusterSettingEntity implements Serializable {
+
+ private static final long serialVersionUID = -3259439355290322115L;
+
+ @PrimaryKey
+ private String recordKey = "";
+ //broker tcp port
+ private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
+ //broker tls port
+ private int brokerTLSPort = TBaseConstants.META_VALUE_UNDEFINED;
+ //broker web port
+ private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED;
+ //store num
+ private int numTopicStores = TBaseConstants.META_VALUE_UNDEFINED;
+ //partition num
+ private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush disk threshold
+ private int unflushDskThreshold = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush disk interval
+ private int unflushDksInterval = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush memory cache threshold
+ private int unflushMemThreshold = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush memory cache interval
+ private int unflushMemInterval = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush memory cache count
+ private int unflushMemCnt = TBaseConstants.META_VALUE_UNDEFINED;
+ private boolean acceptPublish = true; //enable publish
+ private boolean acceptSubscribe = true; //enable subscribe
+ private String deleteWhen = ""; //delete policy execute time
+ private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
+ private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED;
+ private String attributes; //extra attribute
+ private String modifyUser; //modify user
+ private Date modifyDate; //modify date
+
+ public BdbClusterSettingEntity() {
+ }
+
+ //Constructor
+ public BdbClusterSettingEntity(String recordKey, int brokerPort, int brokerTLSPort,
+ int brokerWebPort, int numTopicStores, int numPartitions,
+ int unflushDskThreshold, int unflushDksInterval,
+ int unflushMemThreshold, int unflushMemInterval,
+ int unflushMemCnt, boolean acceptPublish,
+ boolean acceptSubscribe, String deleteWhen,
+ int qryPriorityId, int maxMsgSize, String attributes,
+ String modifyUser, Date modifyDate) {
+ this.recordKey = recordKey;
+ this.brokerPort = brokerPort;
+ this.brokerTLSPort = brokerTLSPort;
+ this.brokerWebPort = brokerWebPort;
+ this.numTopicStores = numTopicStores;
+ this.numPartitions = numPartitions;
+ this.unflushDskThreshold = unflushDskThreshold;
+ this.unflushDksInterval = unflushDksInterval;
+ this.unflushMemThreshold = unflushMemThreshold;
+ this.unflushMemInterval = unflushMemInterval;
+ this.unflushMemCnt = unflushMemCnt;
+ this.acceptPublish = acceptPublish;
+ this.acceptSubscribe = acceptSubscribe;
+ this.deleteWhen = deleteWhen;
+ this.qryPriorityId = qryPriorityId;
+ this.maxMsgSize = maxMsgSize;
+ this.attributes = attributes;
+ this.modifyUser = modifyUser;
+ this.modifyDate = modifyDate;
+ }
+
+ public void setRecordKey(String recordKey) {
+ this.recordKey = recordKey;
+ }
+
+ public String getRecordKey() {
+ return recordKey;
+ }
+
+ public int getBrokerPort() {
+ return brokerPort;
+ }
+
+ public void setBrokerPort(int brokerPort) {
+ this.brokerPort = brokerPort;
+ }
+
+ public int getBrokerTLSPort() {
+ return brokerTLSPort;
+ }
+
+ public void setBrokerTLSPort(int brokerTLSPort) {
+ this.brokerTLSPort = brokerTLSPort;
+ }
+
+ public int getBrokerWebPort() {
+ return brokerWebPort;
+ }
+
+ public void setBrokerWebPort(int brokerWebPort) {
+ this.brokerWebPort = brokerWebPort;
+ }
+
+ public int getNumTopicStores() {
+ return numTopicStores;
+ }
+
+ public void setNumTopicStores(int numTopicStores) {
+ this.numTopicStores = numTopicStores;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ public int getUnflushDskThreshold() {
+ return unflushDskThreshold;
+ }
+
+ public void setUnflushDskThreshold(int unflushDskThreshold) {
+ this.unflushDskThreshold = unflushDskThreshold;
+ }
+
+ public int getUnflushDksInterval() {
+ return unflushDksInterval;
+ }
+
+ public void setUnflushDksInterval(int unflushDksInterval) {
+ this.unflushDksInterval = unflushDksInterval;
+ }
+
+ public int getUnflushMemThreshold() {
+ return unflushMemThreshold;
+ }
+
+ public void setUnflushMemThreshold(int unflushMemThreshold) {
+ this.unflushMemThreshold = unflushMemThreshold;
+ }
+
+ public int getUnflushMemInterval() {
+ return unflushMemInterval;
+ }
+
+ public void setUnflushMemInterval(int unflushMemInterval) {
+ this.unflushMemInterval = unflushMemInterval;
+ }
+
+ public int getUnflushMemCnt() {
+ return unflushMemCnt;
+ }
+
+ public void setUnflushMemCnt(int unflushMemCnt) {
+ this.unflushMemCnt = unflushMemCnt;
+ }
+
+ public boolean isAcceptPublish() {
+ return acceptPublish;
+ }
+
+ public void setAcceptPublish(boolean acceptPublish) {
+ this.acceptPublish = acceptPublish;
+ }
+
+ public boolean isAcceptSubscribe() {
+ return acceptSubscribe;
+ }
+
+ public void setAcceptSubscribe(boolean acceptSubscribe) {
+ this.acceptSubscribe = acceptSubscribe;
+ }
+
+ public String getDeleteWhen() {
+ return deleteWhen;
+ }
+
+ public void setDeleteWhen(String deleteWhen) {
+ this.deleteWhen = deleteWhen;
+ }
+
+ public int getQryPriorityId() {
+ return qryPriorityId;
+ }
+
+ public void setQryPriorityId(int qryPriorityId) {
+ this.qryPriorityId = qryPriorityId;
+ }
+
+ public int getMaxMsgSize() {
+ return maxMsgSize;
+ }
+
+ public void setMaxMsgSize(int maxMsgSize) {
+ this.maxMsgSize = maxMsgSize;
+ }
+
+ public String getAttributes() {
+ return attributes;
+ }
+
+ public void setAttributes(String attributes) {
+ this.attributes = attributes;
+ }
+
+ public String getModifyUser() {
+ return modifyUser;
+ }
+
+ public void setModifyUser(String modifyUser) {
+ this.modifyUser = modifyUser;
+ }
+
+ public Date getModifyDate() {
+ return modifyDate;
+ }
+
+ public void setModifyDate(Date modifyDate) {
+ this.modifyDate = modifyDate;
+ }
+
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder
+ * @return
+ */
+ public StringBuilder toJsonString(final StringBuilder sBuilder) {
+ return sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",")
+ .append("\"recordKey\":\"").append(recordKey).append("\"")
+ .append(",\"brokerPort\":").append(brokerPort)
+ .append(",\"brokerTLSPort\":").append(brokerTLSPort)
+ .append(",\"brokerWebPort\":").append(brokerWebPort)
+ .append(",\"numTopicStores\":").append(numTopicStores)
+ .append(",\"numPartitions\":").append(numPartitions)
+ .append(",\"unflushDskThreshold\":").append(unflushDskThreshold)
+ .append(",\"unflushDksInterval\":").append(unflushDksInterval)
+ .append(",\"unflushMemThreshold\":").append(unflushMemThreshold)
+ .append(",\"unflushMemInterval\":").append(unflushMemInterval)
+ .append(",\"unflushMemCnt\":").append(unflushMemCnt)
+ .append(",\"acceptPublish\":").append(acceptPublish)
+ .append(",\"acceptSubscribe\":").append(acceptSubscribe)
+ .append(",\"deleteWhen\":\"").append(deleteWhen).append("\"")
+ .append(",\"maxMsgSize\":").append(maxMsgSize)
+ .append(",\"qryPriorityId\":").append(qryPriorityId)
+ .append(",\"attributes\":\"").append(attributes).append("\"")
+ .append(",\"modifyUser\":\"").append(modifyUser).append("\"")
+ .append(",\"modifyDate\":\"")
+ .append(WebParameterUtils.date2yyyyMMddHHmmss(modifyDate))
+ .append("\"}");
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("recordKey", recordKey)
+ .append("brokerPort", brokerPort)
+ .append("brokerTLSPort", brokerTLSPort)
+ .append("brokerWebPort", brokerWebPort)
+ .append("numTopicStores", numTopicStores)
+ .append("numPartitions", numPartitions)
+ .append("unflushDskThreshold", unflushDskThreshold)
+ .append("unflushDksInterval", unflushDksInterval)
+ .append("unflushMemThreshold", unflushMemThreshold)
+ .append("unflushMemInterval", unflushMemInterval)
+ .append("unflushMemCnt", unflushMemCnt)
+ .append("acceptPublish", acceptPublish)
+ .append("acceptSubscribe", acceptSubscribe)
+ .append("deleteWhen", deleteWhen)
+ .append("maxMsgSize", maxMsgSize)
+ .append("qryPriorityId", qryPriorityId)
+ .append("attributes", attributes)
+ .append("modifyUser", modifyUser)
+ .append("modifyDate", modifyDate)
+ .toString();
+ }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 698f0d3..86a6bc4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -46,6 +46,7 @@ import org.apache.tubemq.server.master.bdbstore.DefaultBdbStoreService;
import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
@@ -90,6 +91,7 @@ public class BrokerConfManager implements Server {
ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>> groupFilterCondTopicMap;
private ConcurrentHashMap<String /* groupName */, BdbGroupFlowCtrlEntity> consumeGroupFlowCtrlMap;
private ConcurrentHashMap<String /* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap;
+ private ConcurrentHashMap<String /* recordKey */, BdbClusterSettingEntity> clusterSettingMap;
private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
private long lastBrokerUpdatedTime = System.currentTimeMillis();
private long serviceStartTime = System.currentTimeMillis();
@@ -98,6 +100,8 @@ public class BrokerConfManager implements Server {
public BrokerConfManager(DefaultBdbStoreService mBdbStoreManagerService) {
this.mBdbStoreManagerService = mBdbStoreManagerService;
this.replicationConfig = mBdbStoreManagerService.getReplicationConfig();
+ this.clusterSettingMap =
+ this.mBdbStoreManagerService.getClusterDefSettingMap();
this.brokerConfStoreMap = this.mBdbStoreManagerService.getBrokerConfigMap();
for (BdbBrokerConfEntity entity : this.brokerConfStoreMap.values()) {
updateBrokerMaps(entity);
@@ -2013,6 +2017,97 @@ public class BrokerConfManager implements Server {
return true;
}
+ // /////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Add cluster default setting
+ *
+ * @param bdbEntity the cluster default setting entity will be add
+ * @return true if success otherwise false
+ * @throws Exception
+ */
+ public boolean confAddBdbClusterDefSetting(BdbClusterSettingEntity bdbEntity)
+ throws Exception {
+ validMasterStatus();
+ BdbClusterSettingEntity curEntity =
+ clusterSettingMap.get(bdbEntity.getRecordKey());
+ if (curEntity != null) {
+ throw new Exception(new StringBuilder(512)
+ .append("Duplicate add ClusterSetting info, exist record is: ")
+ .append(curEntity).toString());
+ }
+ boolean putResult =
+ mBdbStoreManagerService.putBdbClusterConfEntity(bdbEntity, true);
+ if (putResult) {
+ clusterSettingMap.put(bdbEntity.getRecordKey(), bdbEntity);
+ logger.info(new StringBuilder(512)
+ .append("[ClusterSetting Success] ")
+ .append(bdbEntity).toString());
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * update cluster default setting
+ *
+ * @param bdbEntity the cluster setting entity will be set
+ * @return true if success otherwise false
+ * @throws Exception
+ */
+ public boolean confUpdBdbClusterSetting(BdbClusterSettingEntity bdbEntity)
+ throws Exception {
+ validMasterStatus();
+ StringBuilder strBuffer = new StringBuilder(512);
+ BdbClusterSettingEntity curDefSettingEntity =
+ clusterSettingMap.get(bdbEntity.getRecordKey());
+ if (curDefSettingEntity == null) {
+ throw new Exception(strBuffer
+ .append("Update ClusterSetting failure, not exist record for record: ")
+ .append(bdbEntity.getRecordKey()).toString());
+ }
+ boolean putResult =
+ mBdbStoreManagerService.putBdbClusterConfEntity(bdbEntity, false);
+ if (putResult) {
+ clusterSettingMap.put(bdbEntity.getRecordKey(), bdbEntity);
+ strBuffer.append("[confUpdBdbClusterSetting Success] record from : ");
+ strBuffer = curDefSettingEntity.toJsonString(strBuffer);
+ strBuffer.append(" to : ");
+ strBuffer = bdbEntity.toJsonString(strBuffer);
+ logger.info(strBuffer.toString());
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Delete cluster default setting
+ *
+ * @param strBuffer the error info string buffer
+ * @return true if success
+ * @throws Exception
+ */
+ public boolean confDeleteBdbClusterSetting(final StringBuilder strBuffer) throws Exception {
+ validMasterStatus();
+ BdbClusterSettingEntity curEntity =
+ this.clusterSettingMap.remove(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ if (curEntity != null) {
+ mBdbStoreManagerService.delBdbClusterConfEntity();
+ strBuffer.append(
+ "[confDeleteBdbClusterSetting Success], deleted cluster setting record :");
+ logger.info(curEntity.toJsonString(strBuffer).toString());
+ strBuffer.delete(0, strBuffer.length());
+ } else {
+ logger.info("[confDeleteBdbClusterSetting Success], not found record");
+ }
+ return true;
+ }
+
+ public BdbClusterSettingEntity getBdbClusterSetting() {
+ return this.clusterSettingMap.get(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ }
+
+
private void validMasterStatus() throws Exception {
if (!isSelfMaster()) {
throw new StandbyException("Please send your request to the master Node.");