You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/16 01:01:52 UTC
[incubator-inlong] branch master updated: [INLONG-3154][TubeMQ] Adjust the Master.ini file reading implementation (#3155)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 57b2d3a [INLONG-3154][TubeMQ] Adjust the Master.ini file reading implementation (#3155)
57b2d3a is described below
commit 57b2d3a81cd85f1b19ee52ceb995e5b6a5933878
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Mar 16 09:01:42 2022 +0800
[INLONG-3154][TubeMQ] Adjust the Master.ini file reading implementation (#3155)
---
inlong-tubemq/conf/master.ini | 51 ++--
.../common/fileconfig/AbstractFileConfig.java | 3 +
...erReplicationConfig.java => BdbMetaConfig.java} | 291 ++++++++++---------
.../server/common/fileconfig/ZKMetaConfig.java | 49 ++++
.../inlong/tubemq/server/master/MasterConfig.java | 318 +++++++++++++++------
.../inlong/tubemq/server/master/TMaster.java | 8 +-
.../server/master/metamanage/MetaDataManager.java | 10 +-
...MetaConfigObserver.java => ConfigObserver.java} | 2 +-
.../metamanage/metastore/MetaStoreService.java | 2 +-
.../metastore/dao/mapper/MetaConfigMapper.java | 4 +-
.../metastore/impl/AbsMetaConfigMapperImpl.java | 10 +-
.../impl/bdbimpl/BdbMetaConfigMapperImpl.java | 31 +-
.../impl/bdbimpl/BdbMetaStoreServiceImpl.java | 18 +-
.../impl/zkimpl/ZKMetaConfigMapperImpl.java | 6 +-
.../nodemanage/nodebroker/DefBrokerRunManager.java | 4 +-
.../tubemq/server/master/MasterConfigTest.java | 54 +++-
.../resources/{master.ini => master-bdbstore.ini} | 182 ++++++------
.../src/test/resources/master-meta-bdb.ini} | 179 ++++++------
.../resources/{master.ini => master-meta-zk.ini} | 148 ++++------
19 files changed, 790 insertions(+), 580 deletions(-)
diff --git a/inlong-tubemq/conf/master.ini b/inlong-tubemq/conf/master.ini
index b58a83d..7965a5b 100644
--- a/inlong-tubemq/conf/master.ini
+++ b/inlong-tubemq/conf/master.ini
@@ -38,34 +38,23 @@ webResourcePath=resources
; configure useWebProxy
useWebProxy=false
-; meta data path; can be absolute, or relative to TubeMQ base directory ($BASE_DIR)
-; optional, default is "var/meta_data"
-; should be the same to `[bdbStore].bdbEnvHome` if upgrade from version prior 0.5.0
-;metaDataPath=var/meta_data
-
-[zookeeper]
-; root path of TubeMQ znodes on ZK
-zkNodeRoot=/tubemq
-; connect string of ZK servers
-zkServerAddr=localhost:2181
-; timeout of ZK heartbeat; default is 30000ms
-zkSessionTimeoutMs=30000
-; timeout of ZK connection; default is 30000ms
-zkConnectionTimeoutMs=30000
-; sync time on ZK; default is 5000ms
-zkSyncTimeMs=5000
-; interval to commits data on ZK; default is 5000ms
-zkCommitPeriodMs=5000
-
-
-[replication]
+[meta_bdb]
; name of replication group, default is `tubemqMasterGroup`, you'd better set individual value for every tubeMQ cluster
;repGroupName=tubemqMasterGroup
+
; name of current node; MUST BE DIFFERENT for every node in the same group
repNodeName=tubemqMasterGroupNode1
+
; port for node to communicate to other nodes in replication group, default is 9001
;repNodePort=9001
+
+; bdb meta data path; can be absolute, or relative to TubeMQ base directory ($BASE_DIR)
+; optional, default is "var/meta_data"
+; should be the same to `[bdbStore].bdbEnvHome` if upgrade from version prior 0.5.0
+; or `[master].metaDataPath` if upgrade from version prior 1.0.0
+;metaDataPath=var/meta_data
+
; helperHost(and port) for node to join master cluster and the port should keep consistent with `repNodePort`; for the
; first time of starting, this value for every node in master cluster should keep same
; the default is 127.0.0.1:9001
@@ -81,6 +70,7 @@ repNodeName=tubemqMasterGroupNode1
; actually forced to disk(if the system fails,data may lost)
; sync policy for "local", optional; default is 1(SYNC)
;metaLocalSyncPolicy=1
+
; sync policy for "replica", optional; default is 3(WRITE_NO_SYNC)
;metaReplicaSyncPolicy=3
@@ -92,3 +82,22 @@ repNodeName=tubemqMasterGroupNode1
; interval for node status check task, optional; default is 10000(ms)
;repStatusCheckTimeoutMs=10000
+
+
+;[meta_zookeeper]
+; root path of TubeMQ znodes on ZK
+;zkNodeRoot=/tubemq
+; connect string of ZK servers
+;zkServerAddr=localhost:2181
+; timeout of ZK heartbeat; default is 30000ms
+;zkSessionTimeoutMs=30000
+; timeout of ZK connection; default is 30000ms
+;zkConnectionTimeoutMs=30000
+; sync time on ZK; default is 5000ms
+;zkSyncTimeMs=5000
+; interval to commits data on ZK; default is 5000ms
+;zkCommitPeriodMs=5000
+; Master Select status check duration, default is 5000ms
+zkMasterCheckPeriodMs=5000
+
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java
index 2733d02..ce559c8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java
@@ -36,6 +36,9 @@ public abstract class AbstractFileConfig {
protected static final String SECT_TOKEN_TLS = "tlsSetting";
protected static final String SECT_TOKEN_ZKEEPER = "zookeeper";
protected static final String SECT_TOKEN_REPLICATION = "replication";
+ protected static final String SECT_TOKEN_META_BDB = "meta_bdb";
+ protected static final String SECT_TOKEN_META_ZK = "meta_zookeeper";
+
private static final Logger logger =
LoggerFactory.getLogger(AbstractFileConfig.class);
private String basePath;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/MasterReplicationConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/BdbMetaConfig.java
similarity index 88%
rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/MasterReplicationConfig.java
rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/BdbMetaConfig.java
index 031ff20..2af1921 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/MasterReplicationConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/BdbMetaConfig.java
@@ -1,141 +1,150 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.common.fileconfig;
-
-import com.sleepycat.je.Durability;
-
-/* Named `MasterReplicationConfig` to avoid conflict with `com.sleepycat.je.rep.ReplicationConfig` */
-public class MasterReplicationConfig {
- private String repGroupName = "tubemqMasterGroup";
- private String repNodeName;
- private int repNodePort = 9001;
- private String repHelperHost = "127.0.0.1:9001";
- private int metaLocalSyncPolicy = 1;
- private int metaReplicaSyncPolicy = 3;
- private int repReplicaAckPolicy = 1;
- private long repStatusCheckTimeoutMs = 10000;
-
- public MasterReplicationConfig() {
-
- }
-
- public String getRepGroupName() {
- return repGroupName;
- }
-
- public void setRepGroupName(String repGroupName) {
- this.repGroupName = repGroupName;
- }
-
- public String getRepNodeName() {
- return repNodeName;
- }
-
- public void setRepNodeName(String repNodeName) {
- this.repNodeName = repNodeName;
- }
-
- public int getRepNodePort() {
- return repNodePort;
- }
-
- public void setRepNodePort(int repNodePort) {
- this.repNodePort = repNodePort;
- }
-
- public String getRepHelperHost() {
- return repHelperHost;
- }
-
- public void setRepHelperHost(String repHelperHost) {
- this.repHelperHost = repHelperHost;
- }
-
- public Durability.SyncPolicy getMetaLocalSyncPolicy() {
- switch (metaLocalSyncPolicy) {
- case 1:
- return Durability.SyncPolicy.SYNC;
- case 2:
- return Durability.SyncPolicy.NO_SYNC;
- case 3:
- return Durability.SyncPolicy.WRITE_NO_SYNC;
- default:
- return Durability.SyncPolicy.SYNC;
- }
- }
-
- public void setMetaLocalSyncPolicy(int metaLocalSyncPolicy) {
- this.metaLocalSyncPolicy = metaLocalSyncPolicy;
- }
-
- public Durability.SyncPolicy getMetaReplicaSyncPolicy() {
- switch (metaReplicaSyncPolicy) {
- case 1:
- return Durability.SyncPolicy.SYNC;
- case 2:
- return Durability.SyncPolicy.NO_SYNC;
- case 3:
- return Durability.SyncPolicy.WRITE_NO_SYNC;
- default:
- return Durability.SyncPolicy.SYNC;
- }
- }
-
- public void setMetaReplicaSyncPolicy(int metaReplicaSyncPolicy) {
- this.metaReplicaSyncPolicy = metaReplicaSyncPolicy;
- }
-
- public Durability.ReplicaAckPolicy getRepReplicaAckPolicy() {
- switch (repReplicaAckPolicy) {
- case 1:
- return Durability.ReplicaAckPolicy.SIMPLE_MAJORITY;
- case 2:
- return Durability.ReplicaAckPolicy.ALL;
- case 3:
- return Durability.ReplicaAckPolicy.NONE;
- default:
- return Durability.ReplicaAckPolicy.SIMPLE_MAJORITY;
- }
- }
-
- public void setRepReplicaAckPolicy(int repReplicaAckPolicy) {
- this.repReplicaAckPolicy = repReplicaAckPolicy;
- }
-
- public long getRepStatusCheckTimeoutMs() {
- return repStatusCheckTimeoutMs;
- }
-
- public void setRepStatusCheckTimeoutMs(long repStatusCheckTimeoutMs) {
- this.repStatusCheckTimeoutMs = repStatusCheckTimeoutMs;
- }
-
- @Override
- public String toString() {
- return new StringBuilder(512)
- .append("\"MasterReplicationConfig\":{\"repGroupName\":").append(repGroupName)
- .append("\",\"repNodeName\":\"").append(repNodeName)
- .append("\",\"repNodePort\":").append(repNodePort)
- .append("\",\"repHelperHost\":\"").append(repHelperHost)
- .append("\",\"metaLocalSyncPolicy\":").append(metaLocalSyncPolicy)
- .append(",\"metaReplicaSyncPolicy\":").append(metaReplicaSyncPolicy)
- .append(",\"repReplicaAckPolicy\":").append(repReplicaAckPolicy)
- .append(",\"repStatusCheckTimeoutMs\":").append(repStatusCheckTimeoutMs)
- .append("}").toString();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common.fileconfig;
+
+import com.sleepycat.je.Durability;
+
+public class BdbMetaConfig {
+ private String repGroupName = "tubemqMasterGroup";
+ private String repNodeName;
+ private int repNodePort = 9001;
+ private String metaDataPath = "var/meta_data";
+ private String repHelperHost = "127.0.0.1:9001";
+ private int metaLocalSyncPolicy = 1;
+ private int metaReplicaSyncPolicy = 3;
+ private int repReplicaAckPolicy = 1;
+ private long repStatusCheckTimeoutMs = 10000;
+
+ public BdbMetaConfig() {
+
+ }
+
+ public String getRepGroupName() {
+ return repGroupName;
+ }
+
+ public void setRepGroupName(String repGroupName) {
+ this.repGroupName = repGroupName;
+ }
+
+ public String getRepNodeName() {
+ return repNodeName;
+ }
+
+ public void setRepNodeName(String repNodeName) {
+ this.repNodeName = repNodeName;
+ }
+
+ public int getRepNodePort() {
+ return repNodePort;
+ }
+
+ public void setRepNodePort(int repNodePort) {
+ this.repNodePort = repNodePort;
+ }
+
+ public String getMetaDataPath() {
+ return metaDataPath;
+ }
+
+ public void setMetaDataPath(String metaDataPath) {
+ this.metaDataPath = metaDataPath;
+ }
+
+ public String getRepHelperHost() {
+ return repHelperHost;
+ }
+
+ public void setRepHelperHost(String repHelperHost) {
+ this.repHelperHost = repHelperHost;
+ }
+
+ public Durability.SyncPolicy getMetaLocalSyncPolicy() {
+ switch (metaLocalSyncPolicy) {
+ case 1:
+ return Durability.SyncPolicy.SYNC;
+ case 2:
+ return Durability.SyncPolicy.NO_SYNC;
+ case 3:
+ return Durability.SyncPolicy.WRITE_NO_SYNC;
+ default:
+ return Durability.SyncPolicy.SYNC;
+ }
+ }
+
+ public void setMetaLocalSyncPolicy(int metaLocalSyncPolicy) {
+ this.metaLocalSyncPolicy = metaLocalSyncPolicy;
+ }
+
+ public Durability.SyncPolicy getMetaReplicaSyncPolicy() {
+ switch (metaReplicaSyncPolicy) {
+ case 1:
+ return Durability.SyncPolicy.SYNC;
+ case 2:
+ return Durability.SyncPolicy.NO_SYNC;
+ case 3:
+ return Durability.SyncPolicy.WRITE_NO_SYNC;
+ default:
+ return Durability.SyncPolicy.SYNC;
+ }
+ }
+
+ public void setMetaReplicaSyncPolicy(int metaReplicaSyncPolicy) {
+ this.metaReplicaSyncPolicy = metaReplicaSyncPolicy;
+ }
+
+ public Durability.ReplicaAckPolicy getRepReplicaAckPolicy() {
+ switch (repReplicaAckPolicy) {
+ case 1:
+ return Durability.ReplicaAckPolicy.SIMPLE_MAJORITY;
+ case 2:
+ return Durability.ReplicaAckPolicy.ALL;
+ case 3:
+ return Durability.ReplicaAckPolicy.NONE;
+ default:
+ return Durability.ReplicaAckPolicy.SIMPLE_MAJORITY;
+ }
+ }
+
+ public void setRepReplicaAckPolicy(int repReplicaAckPolicy) {
+ this.repReplicaAckPolicy = repReplicaAckPolicy;
+ }
+
+ public long getRepStatusCheckTimeoutMs() {
+ return repStatusCheckTimeoutMs;
+ }
+
+ public void setRepStatusCheckTimeoutMs(long repStatusCheckTimeoutMs) {
+ this.repStatusCheckTimeoutMs = repStatusCheckTimeoutMs;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder(512)
+ .append("\"BdbMetaConfig\":{\"repGroupName\":").append(repGroupName)
+ .append("\",\"repNodeName\":\"").append(repNodeName)
+ .append("\",\"repNodePort\":").append(repNodePort)
+ .append("\",\"metaDataPath\":\"").append(metaDataPath)
+ .append("\",\"repHelperHost\":\"").append(repHelperHost)
+ .append("\",\"metaLocalSyncPolicy\":").append(metaLocalSyncPolicy)
+ .append(",\"metaReplicaSyncPolicy\":").append(metaReplicaSyncPolicy)
+ .append(",\"repReplicaAckPolicy\":").append(repReplicaAckPolicy)
+ .append(",\"repStatusCheckTimeoutMs\":").append(repStatusCheckTimeoutMs)
+ .append("}").toString();
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/ZKMetaConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/ZKMetaConfig.java
new file mode 100644
index 0000000..896dda5
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/ZKMetaConfig.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common.fileconfig;
+
+public class ZKMetaConfig extends ZKConfig {
+
+ private long zkMasterCheckPeriodMs = 5000L;
+
+ public ZKMetaConfig() {
+
+ }
+
+ public long getZkMasterCheckPeriodMs() {
+ return zkMasterCheckPeriodMs;
+ }
+
+ public void setZkMasterCheckPeriodMs(long zkMasterCheckPeriodMs) {
+ this.zkMasterCheckPeriodMs = zkMasterCheckPeriodMs;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder(512)
+ .append("\"ZKMetaConfig\":{\"zkServerAddr\":\"").append(getZkServerAddr())
+ .append("\",\"zkNodeRoot\":\"").append(getZkNodeRoot())
+ .append("\",\"zkSessionTimeoutMs\":").append(getZkSessionTimeoutMs())
+ .append(",\"zkConnectionTimeoutMs\":").append(getZkConnectionTimeoutMs())
+ .append(",\"zkSyncTimeMs\":").append(getZkSyncTimeMs())
+ .append(",\"zkCommitPeriodMs\":").append(getZkCommitPeriodMs())
+ .append(",\"zkCommitFailRetries\":").append(getZkCommitFailRetries())
+ .append(",\"zkMasterCheckPeriodMs\":").append(zkMasterCheckPeriodMs)
+ .append("}").toString();
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
index 0daa096..2f4eba5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
@@ -27,8 +27,8 @@ import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corerpc.RpcConstants;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.fileconfig.AbstractFileConfig;
-import org.apache.inlong.tubemq.server.common.fileconfig.MasterReplicationConfig;
-import org.apache.inlong.tubemq.server.common.fileconfig.ZKConfig;
+import org.apache.inlong.tubemq.server.common.fileconfig.BdbMetaConfig;
+import org.apache.inlong.tubemq.server.common.fileconfig.ZKMetaConfig;
import org.ini4j.Ini;
import org.ini4j.Profile;
import org.slf4j.Logger;
@@ -43,9 +43,10 @@ public class MasterConfig extends AbstractFileConfig {
private String hostName;
private int port;
private int webPort = 8080;
- private MasterReplicationConfig replicationConfig = new MasterReplicationConfig();
private TLSConfig tlsConfig;
- private ZKConfig zkConfig;
+ private boolean useBdbStoreMetaData = false;
+ private ZKMetaConfig zkMetaConfig = null;
+ private BdbMetaConfig bdbMetaConfig = null;
private int consumerBalancePeriodMs = 60 * 1000;
private int firstBalanceDelayAfterStartMs = 30 * 1000;
private int consumerHeartbeatTimeoutMs = 30 * 1000;
@@ -59,7 +60,6 @@ public class MasterConfig extends AbstractFileConfig {
private long stepChgWaitPeriodMs = 12 * 1000;
private String confModAuthToken = "ASDFGHJKL";
private String webResourcePath = "../resources";
- private String metaDataPath = "var/meta_data";
private int maxGroupBrokerConsumeRate = 50;
private int maxGroupRebalanceWaitPeriod = 2;
private int maxAutoForbiddenCnt = 5;
@@ -147,7 +147,10 @@ public class MasterConfig extends AbstractFileConfig {
}
public String getMetaDataPath() {
- return metaDataPath;
+ if (useBdbStoreMetaData) {
+ return this.bdbMetaConfig.getMetaDataPath();
+ }
+ return null;
}
/**
@@ -191,16 +194,16 @@ public class MasterConfig extends AbstractFileConfig {
return maxAutoForbiddenCnt;
}
- public MasterReplicationConfig getReplicationConfig() {
- return this.replicationConfig;
+ public BdbMetaConfig getBdbMetaConfig() {
+ return this.bdbMetaConfig;
}
public TLSConfig getTlsConfig() {
return this.tlsConfig;
}
- public ZKConfig getZkConfig() {
- return zkConfig;
+ public ZKMetaConfig getZkMetaConfig() {
+ return zkMetaConfig;
}
public boolean isStartVisitTokenCheck() {
@@ -263,18 +266,21 @@ public class MasterConfig extends AbstractFileConfig {
return maxMetaForceUpdatePeriodMs;
}
+ public boolean isUseBdbStoreMetaData() {
+ return useBdbStoreMetaData;
+ }
+
/**
* Load file section attributes
*
- * @param iniConf
+ * @param iniConf the master ini object
*/
@Override
protected void loadFileSectAttributes(final Ini iniConf) {
this.loadSystemConf(iniConf);
- this.loadReplicationSectConf(iniConf);
+ this.loadMetaDataSectConf(iniConf);
this.tlsConfig = this.loadTlsSectConf(iniConf,
TBaseConstants.META_DEFAULT_MASTER_TLS_PORT);
- this.zkConfig = loadZKeeperSectConf(iniConf);
if (this.port == this.webPort
|| (tlsConfig.isTlsEnable() && (this.tlsConfig.getTlsPort() == this.webPort))) {
throw new IllegalArgumentException(new StringBuilder(512)
@@ -282,25 +288,27 @@ public class MasterConfig extends AbstractFileConfig {
.append("port or tlsPort cannot be the same as the value of webPort!")
.toString());
}
- if (this.port == replicationConfig.getRepNodePort() || (tlsConfig.isTlsEnable()
- && (this.tlsConfig.getTlsPort() == replicationConfig.getRepNodePort()))) {
- throw new IllegalArgumentException(new StringBuilder(512)
- .append("Illegal field value configuration, the value of ")
- .append("port or tlsPort cannot be the same as the value of repNodePort!")
- .toString());
- }
- if (this.webPort == replicationConfig.getRepNodePort()) {
- throw new IllegalArgumentException(new StringBuilder(512)
- .append("Illegal field value configuration, the value of ")
- .append("webPort cannot be the same as the value of repNodePort!")
- .toString());
+ if (useBdbStoreMetaData) {
+ if (this.port == bdbMetaConfig.getRepNodePort() || (tlsConfig.isTlsEnable()
+ && (this.tlsConfig.getTlsPort() == bdbMetaConfig.getRepNodePort()))) {
+ throw new IllegalArgumentException(new StringBuilder(512)
+ .append("Illegal field value configuration, the value of ")
+ .append("port or tlsPort cannot be the same as the value of repNodePort!")
+ .toString());
+ }
+ if (this.webPort == bdbMetaConfig.getRepNodePort()) {
+ throw new IllegalArgumentException(new StringBuilder(512)
+ .append("Illegal field value configuration, the value of ")
+ .append("webPort cannot be the same as the value of repNodePort!")
+ .toString());
+ }
}
}
/**
* Load system config
*
- * @param iniConf
+ * @param iniConf the master ini object
*/
// #lizard forgives
private void loadSystemConf(final Ini iniConf) {
@@ -345,11 +353,6 @@ public class MasterConfig extends AbstractFileConfig {
}
this.webResourcePath = masterConf.get("webResourcePath").trim();
- // meta data path
- if (TStringUtils.isNotBlank(masterConf.get("metaDataPath"))) {
- this.metaDataPath = masterConf.get("metaDataPath").trim();
- }
-
if (TStringUtils.isNotBlank(masterConf.get("consumerBalancePeriodMs"))) {
this.consumerBalancePeriodMs =
this.getInt(masterConf, "consumerBalancePeriodMs");
@@ -504,14 +507,157 @@ public class MasterConfig extends AbstractFileConfig {
}
/**
+ * Load meta-data section config
+ *
+ * @param iniConf the master ini object
+ */
+ private void loadMetaDataSectConf(final Ini iniConf) {
+ if (iniConf.get(SECT_TOKEN_META_BDB) != null
+ && iniConf.get(SECT_TOKEN_META_ZK) != null) {
+ throw new IllegalArgumentException(new StringBuilder(256)
+ .append("Cannot configure both ").append(SECT_TOKEN_META_BDB).append(" and ")
+ .append(SECT_TOKEN_META_ZK).append(" meta-data sections in the same time")
+ .append(", please confirm them and retain one first!").toString());
+ }
+ Profile.Section metaSect = iniConf.get(SECT_TOKEN_META_ZK);
+ if (metaSect != null) {
+ this.useBdbStoreMetaData = false;
+ this.zkMetaConfig = loadZkMetaSectConf(iniConf);
+ return;
+ }
+ metaSect = iniConf.get(SECT_TOKEN_META_BDB);
+ if (metaSect != null) {
+ this.useBdbStoreMetaData = true;
+ this.bdbMetaConfig = loadBdbMetaSectConf(iniConf);
+ return;
+ }
+ metaSect = iniConf.get(SECT_TOKEN_REPLICATION);
+ if (metaSect != null) {
+ this.useBdbStoreMetaData = true;
+ this.bdbMetaConfig = loadReplicationSectConf(iniConf);
+ return;
+ }
+ metaSect = iniConf.get(SECT_TOKEN_BDB);
+ if (metaSect != null) {
+ this.useBdbStoreMetaData = true;
+ this.bdbMetaConfig = loadBdbStoreSectConf(iniConf);
+ return;
+ }
+ throw new IllegalArgumentException(new StringBuilder(256)
+ .append("Missing necessary meta-data section, please select ")
+ .append(SECT_TOKEN_META_ZK).append(" or ").append(SECT_TOKEN_META_BDB)
+ .append(" and configure ini again!").toString());
+ }
+
+ /**
+ * Load ZooKeeper store section configure as meta-data storage
+ *
+ * @param iniConf the master ini object
+ * @return the configured information
+ */
+
+ private ZKMetaConfig loadZkMetaSectConf(final Ini iniConf) {
+ final Profile.Section zkeeperSect = iniConf.get(SECT_TOKEN_META_ZK);
+ if (zkeeperSect == null) {
+ throw new IllegalArgumentException(new StringBuilder(256)
+ .append(SECT_TOKEN_META_ZK).append(" configure section is required!").toString());
+ }
+ Set<String> configKeySet = zkeeperSect.keySet();
+ if (configKeySet.isEmpty()) {
+ throw new IllegalArgumentException(new StringBuilder(256)
+ .append("Empty configure item in ").append(SECT_TOKEN_META_ZK)
+ .append(" section!").toString());
+ }
+ ZKMetaConfig zkMetaConfig = new ZKMetaConfig();
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkServerAddr"))) {
+ zkMetaConfig.setZkServerAddr(zkeeperSect.get("zkServerAddr").trim());
+ }
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkNodeRoot"))) {
+ zkMetaConfig.setZkNodeRoot(zkeeperSect.get("zkNodeRoot").trim());
+ }
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkSessionTimeoutMs"))) {
+ zkMetaConfig.setZkSessionTimeoutMs(getInt(zkeeperSect, "zkSessionTimeoutMs"));
+ }
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkConnectionTimeoutMs"))) {
+ zkMetaConfig.setZkConnectionTimeoutMs(getInt(zkeeperSect, "zkConnectionTimeoutMs"));
+ }
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkSyncTimeMs"))) {
+ zkMetaConfig.setZkSyncTimeMs(getInt(zkeeperSect, "zkSyncTimeMs"));
+ }
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkCommitPeriodMs"))) {
+ zkMetaConfig.setZkCommitPeriodMs(getLong(zkeeperSect, "zkCommitPeriodMs"));
+ }
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkCommitFailRetries"))) {
+ zkMetaConfig.setZkCommitFailRetries(getInt(zkeeperSect, "zkCommitFailRetries"));
+ }
+ if (TStringUtils.isNotBlank(zkeeperSect.get("zkMasterCheckPeriodMs"))) {
+ zkMetaConfig.setZkMasterCheckPeriodMs(getInt(zkeeperSect, "zkMasterCheckPeriodMs"));
+ }
+ return zkMetaConfig;
+ }
+
+ /**
+ * Load Berkeley DB store section configure as meta-data storage
+ *
+ * @param iniConf the master ini object
+ * @return the configured information
+ */
+ private BdbMetaConfig loadBdbMetaSectConf(final Ini iniConf) {
+ final Profile.Section repSect = iniConf.get(SECT_TOKEN_META_BDB);
+ if (repSect == null) {
+ return null;
+ }
+ Set<String> configKeySet = repSect.keySet();
+ if (configKeySet.isEmpty()) {
+ throw new IllegalArgumentException(new StringBuilder(256)
+ .append("Empty configure item in ").append(SECT_TOKEN_META_BDB)
+ .append(" section!").toString());
+ }
+ BdbMetaConfig tmpMetaConfig = new BdbMetaConfig();
+ // read configure items
+ if (TStringUtils.isNotBlank(repSect.get("repGroupName"))) {
+ tmpMetaConfig.setRepGroupName(repSect.get("repGroupName").trim());
+ }
+ if (TStringUtils.isBlank(repSect.get("repNodeName"))) {
+ getSimilarConfigField(SECT_TOKEN_META_BDB, configKeySet, "repNodeName");
+ } else {
+ tmpMetaConfig.setRepNodeName(repSect.get("repNodeName").trim());
+ }
+ if (TStringUtils.isNotBlank(repSect.get("repNodePort"))) {
+ tmpMetaConfig.setRepNodePort(getInt(repSect, "repNodePort"));
+ }
+ if (TStringUtils.isNotBlank(repSect.get("metaDataPath"))) {
+ tmpMetaConfig.setMetaDataPath(repSect.get("metaDataPath").trim());
+ }
+ if (TStringUtils.isNotBlank(repSect.get("repHelperHost"))) {
+ tmpMetaConfig.setRepHelperHost(repSect.get("repHelperHost").trim());
+ }
+ if (TStringUtils.isNotBlank(repSect.get("metaLocalSyncPolicy"))) {
+ tmpMetaConfig.setMetaLocalSyncPolicy(getInt(repSect, "metaLocalSyncPolicy"));
+ }
+ if (TStringUtils.isNotBlank(repSect.get("metaReplicaSyncPolicy"))) {
+ tmpMetaConfig.setMetaReplicaSyncPolicy(getInt(repSect, "metaReplicaSyncPolicy"));
+ }
+ if (TStringUtils.isNotBlank(repSect.get("repReplicaAckPolicy"))) {
+ tmpMetaConfig.setRepReplicaAckPolicy(getInt(repSect, "repReplicaAckPolicy"));
+ }
+ if (TStringUtils.isNotBlank(repSect.get("repStatusCheckTimeoutMs"))) {
+ tmpMetaConfig.setRepStatusCheckTimeoutMs(getLong(repSect, "repStatusCheckTimeoutMs"));
+ }
+ return tmpMetaConfig;
+ }
+
+ /**
* Deprecated: Load Berkeley DB store section config
* Just keep `loadBdbStoreSectConf` for backward compatibility
- * @param iniConf
+ *
+ * @param iniConf the master ini object
+ * @return the configured information
*/
- private boolean loadBdbStoreSectConf(final Ini iniConf) {
+ private BdbMetaConfig loadBdbStoreSectConf(final Ini iniConf) {
final Profile.Section bdbSect = iniConf.get(SECT_TOKEN_BDB);
if (bdbSect == null) {
- return false;
+ return null;
}
Set<String> configKeySet = bdbSect.keySet();
if (configKeySet.isEmpty()) {
@@ -519,70 +665,58 @@ public class MasterConfig extends AbstractFileConfig {
.append("Empty configure item in ").append(SECT_TOKEN_BDB)
.append(" section!").toString());
}
+ logger.warn("[bdbStore] section is deprecated. Please config in [meta_bdb] section.");
+ // read configure items
+ BdbMetaConfig tmpMetaConfig = new BdbMetaConfig();
if (TStringUtils.isBlank(bdbSect.get("bdbRepGroupName"))) {
getSimilarConfigField(SECT_TOKEN_BDB, configKeySet, "bdbRepGroupName");
} else {
- replicationConfig.setRepGroupName(bdbSect.get("bdbRepGroupName").trim());
+ tmpMetaConfig.setRepGroupName(bdbSect.get("bdbRepGroupName").trim());
}
if (TStringUtils.isBlank(bdbSect.get("bdbNodeName"))) {
getSimilarConfigField(SECT_TOKEN_BDB, configKeySet, "bdbNodeName");
} else {
- replicationConfig.setRepNodeName(bdbSect.get("bdbNodeName").trim());
+ tmpMetaConfig.setRepNodeName(bdbSect.get("bdbNodeName").trim());
}
- if (TStringUtils.isBlank(bdbSect.get("bdbNodePort"))) {
- replicationConfig.setRepNodePort(9001);
- } else {
- replicationConfig.setRepNodePort(getInt(bdbSect, "bdbNodePort"));
+ if (TStringUtils.isNotBlank(bdbSect.get("bdbNodePort"))) {
+ tmpMetaConfig.setRepNodePort(getInt(bdbSect, "bdbNodePort"));
}
if (TStringUtils.isBlank(bdbSect.get("bdbEnvHome"))) {
getSimilarConfigField(SECT_TOKEN_BDB, configKeySet, "bdbEnvHome");
} else {
- this.metaDataPath = bdbSect.get("bdbEnvHome").trim();
+ tmpMetaConfig.setMetaDataPath(bdbSect.get("bdbEnvHome").trim());
}
if (TStringUtils.isBlank(bdbSect.get("bdbHelperHost"))) {
getSimilarConfigField(SECT_TOKEN_BDB, configKeySet, "bdbHelperHost");
} else {
- replicationConfig.setRepHelperHost(bdbSect.get("bdbHelperHost").trim());
+ tmpMetaConfig.setRepHelperHost(bdbSect.get("bdbHelperHost").trim());
}
- if (TStringUtils.isBlank(bdbSect.get("bdbLocalSync"))) {
- replicationConfig.setMetaLocalSyncPolicy(1);
- } else {
- replicationConfig.setMetaLocalSyncPolicy(getInt(bdbSect, "bdbLocalSync"));
+ if (TStringUtils.isNotBlank(bdbSect.get("bdbLocalSync"))) {
+ tmpMetaConfig.setMetaLocalSyncPolicy(getInt(bdbSect, "bdbLocalSync"));
}
- if (TStringUtils.isBlank(bdbSect.get("bdbReplicaSync"))) {
- replicationConfig.setMetaReplicaSyncPolicy(3);
- } else {
- replicationConfig.setMetaReplicaSyncPolicy(getInt(bdbSect, "bdbReplicaSync"));
+ if (TStringUtils.isNotBlank(bdbSect.get("bdbReplicaSync"))) {
+ tmpMetaConfig.setMetaReplicaSyncPolicy(getInt(bdbSect, "bdbReplicaSync"));
}
- if (TStringUtils.isBlank(bdbSect.get("bdbReplicaAck"))) {
- replicationConfig.setRepReplicaAckPolicy(1);
- } else {
- replicationConfig.setRepReplicaAckPolicy(getInt(bdbSect, "bdbReplicaAck"));
+ if (TStringUtils.isNotBlank(bdbSect.get("bdbReplicaAck"))) {
+ tmpMetaConfig.setRepReplicaAckPolicy(getInt(bdbSect, "bdbReplicaAck"));
}
- if (TStringUtils.isBlank(bdbSect.get("bdbStatusCheckTimeoutMs"))) {
- replicationConfig.setRepStatusCheckTimeoutMs(10000);
- } else {
- replicationConfig.setRepStatusCheckTimeoutMs(getLong(bdbSect, "bdbStatusCheckTimeoutMs"));
+ if (TStringUtils.isNotBlank(bdbSect.get("bdbStatusCheckTimeoutMs"))) {
+ tmpMetaConfig.setRepStatusCheckTimeoutMs(getLong(bdbSect, "bdbStatusCheckTimeoutMs"));
}
-
- return true;
+ return tmpMetaConfig;
}
/**
- * Load Replication section config
+ * Deprecated: Load Berkeley DB store section config
+ * Just keep `loadReplicationSectConf` for backward compatibility
*
- * @param iniConf
+ * @param iniConf the master ini object
+ * @return the configured information
*/
- private void loadReplicationSectConf(final Ini iniConf) {
+ private BdbMetaConfig loadReplicationSectConf(final Ini iniConf) {
final Profile.Section repSect = iniConf.get(SECT_TOKEN_REPLICATION);
if (repSect == null) {
- if (!this.loadBdbStoreSectConf(iniConf)) { // read [bdbStore] for backward compatibility
- throw new IllegalArgumentException(new StringBuilder(256)
- .append(SECT_TOKEN_REPLICATION).append(" configure section is required!").toString());
- }
- logger.warn("[bdbStore] section is deprecated. "
- + "Please config in [replication] section.");
- return;
+ return null;
}
Set<String> configKeySet = repSect.keySet();
if (configKeySet.isEmpty()) {
@@ -590,41 +724,53 @@ public class MasterConfig extends AbstractFileConfig {
.append("Empty configure item in ").append(SECT_TOKEN_REPLICATION)
.append(" section!").toString());
}
+ BdbMetaConfig tmpMetaConfig = new BdbMetaConfig();
+ logger.warn("[replication] section is deprecated. Please config in [meta_bdb] section.");
+ // read configure items
if (TStringUtils.isNotBlank(repSect.get("repGroupName"))) {
- replicationConfig.setRepGroupName(repSect.get("repGroupName").trim());
+ tmpMetaConfig.setRepGroupName(repSect.get("repGroupName").trim());
}
if (TStringUtils.isBlank(repSect.get("repNodeName"))) {
getSimilarConfigField(SECT_TOKEN_REPLICATION, configKeySet, "repNodeName");
} else {
- replicationConfig.setRepNodeName(repSect.get("repNodeName").trim());
+ tmpMetaConfig.setRepNodeName(repSect.get("repNodeName").trim());
}
if (TStringUtils.isNotBlank(repSect.get("repNodePort"))) {
- replicationConfig.setRepNodePort(getInt(repSect, "repNodePort"));
+ tmpMetaConfig.setRepNodePort(getInt(repSect, "repNodePort"));
+ }
+ // meta data path
+ final Profile.Section masterConf = iniConf.get(SECT_TOKEN_MASTER);
+ if (TStringUtils.isNotBlank(masterConf.get("metaDataPath"))) {
+ tmpMetaConfig.setMetaDataPath(masterConf.get("metaDataPath").trim());
}
if (TStringUtils.isNotBlank(repSect.get("repHelperHost"))) {
- replicationConfig.setRepHelperHost(repSect.get("repHelperHost").trim());
+ tmpMetaConfig.setRepHelperHost(repSect.get("repHelperHost").trim());
}
if (TStringUtils.isNotBlank(repSect.get("metaLocalSyncPolicy"))) {
- replicationConfig.setMetaLocalSyncPolicy(getInt(repSect, "metaLocalSyncPolicy"));
+ tmpMetaConfig.setMetaLocalSyncPolicy(getInt(repSect, "metaLocalSyncPolicy"));
}
if (TStringUtils.isNotBlank(repSect.get("metaReplicaSyncPolicy"))) {
- replicationConfig.setMetaReplicaSyncPolicy(getInt(repSect, "metaReplicaSyncPolicy"));
+ tmpMetaConfig.setMetaReplicaSyncPolicy(getInt(repSect, "metaReplicaSyncPolicy"));
}
if (TStringUtils.isNotBlank(repSect.get("repReplicaAckPolicy"))) {
- replicationConfig.setRepReplicaAckPolicy(getInt(repSect, "repReplicaAckPolicy"));
+ tmpMetaConfig.setRepReplicaAckPolicy(getInt(repSect, "repReplicaAckPolicy"));
}
if (TStringUtils.isNotBlank(repSect.get("repStatusCheckTimeoutMs"))) {
- replicationConfig.setRepStatusCheckTimeoutMs(getLong(repSect, "repStatusCheckTimeoutMs"));
+ tmpMetaConfig.setRepStatusCheckTimeoutMs(getLong(repSect, "repStatusCheckTimeoutMs"));
}
+ return tmpMetaConfig;
}
@Override
public String toString() {
return new ToStringBuilder(this)
- .append(super.toString())
.append("hostName", hostName)
.append("port", port)
.append("webPort", webPort)
+ .append("tlsConfig", tlsConfig)
+ .append("useBdbStoreMetaData", useBdbStoreMetaData)
+ .append("zkMetaConfig", zkMetaConfig)
+ .append("bdbMetaConfig", bdbMetaConfig)
.append("consumerBalancePeriodMs", consumerBalancePeriodMs)
.append("firstBalanceDelayAfterStartMs", firstBalanceDelayAfterStartMs)
.append("consumerHeartbeatTimeoutMs", consumerHeartbeatTimeoutMs)
@@ -641,17 +787,23 @@ public class MasterConfig extends AbstractFileConfig {
.append("maxGroupBrokerConsumeRate", maxGroupBrokerConsumeRate)
.append("maxGroupRebalanceWaitPeriod", maxGroupRebalanceWaitPeriod)
.append("maxAutoForbiddenCnt", maxAutoForbiddenCnt)
+ .append("socketSendBuffer", socketSendBuffer)
+ .append("socketRecvBuffer", socketRecvBuffer)
.append("startOffsetResetCheck", startOffsetResetCheck)
.append("rowLockWaitDurMs", rowLockWaitDurMs)
+ .append("startVisitTokenCheck", startVisitTokenCheck)
+ .append("startProduceAuthenticate", startProduceAuthenticate)
+ .append("startProduceAuthorize", startProduceAuthorize)
+ .append("startConsumeAuthenticate", startConsumeAuthenticate)
+ .append("startConsumeAuthorize", startConsumeAuthorize)
+ .append("visitTokenValidPeriodMs", visitTokenValidPeriodMs)
.append("needBrokerVisitAuth", needBrokerVisitAuth)
.append("useWebProxy", useWebProxy)
.append("visitName", visitName)
.append("visitPassword", visitPassword)
+ .append("authValidTimeStampPeriodMs", authValidTimeStampPeriodMs)
.append("rebalanceParallel", rebalanceParallel)
.append("maxMetaForceUpdatePeriodMs", maxMetaForceUpdatePeriodMs)
- .append(",").append(replicationConfig.toString())
- .append(",").append(tlsConfig.toString())
- .append(",").append(zkConfig.toString())
- .append("}").toString();
+ .toString();
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 75900f4..22e4217 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -173,7 +173,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.masterConfig = masterConfig;
this.masterRowLock =
new RowLock("Master-RowLock", this.masterConfig.getRowLockWaitDurMs());
- this.checkAndCreateBdbDataPath();
+ if (this.masterConfig.isUseBdbStoreMetaData()) {
+ this.chkAndCreateBdbMetaDataPath();
+ }
this.masterAddInfo =
new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
// register metric bean
@@ -2514,11 +2516,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
/**
- * check bdb data path, create it if not exist
+ * check bdb meta-data path, create it if not exist
*
* @throws Exception
*/
- private void checkAndCreateBdbDataPath() throws Exception {
+ private void chkAndCreateBdbMetaDataPath() throws Exception {
String bdbEnvPath = this.masterConfig.getMetaDataPath();
final File dir = new File(bdbEnvPath);
if (!dir.exists() && !dir.mkdirs()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 9351bc4..0df9150 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -38,7 +38,7 @@ import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.server.Server;
import org.apache.inlong.tubemq.server.common.TServerConstants;
-import org.apache.inlong.tubemq.server.common.fileconfig.MasterReplicationConfig;
+import org.apache.inlong.tubemq.server.common.fileconfig.BdbMetaConfig;
import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
import org.apache.inlong.tubemq.server.common.statusdef.TopicStsChgType;
@@ -46,7 +46,7 @@ import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.bdbstore.MasterGroupStatus;
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaConfigObserver;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.ConfigObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbMetaStoreServiceImpl;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaStoreService;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
@@ -73,7 +73,7 @@ public class MetaDataManager implements Server {
private final TMaster tMaster;
private static final ClusterSettingEntity defClusterSetting =
new ClusterSettingEntity().fillDefaultValue();
- private final MasterReplicationConfig replicationConfig;
+ private final BdbMetaConfig replicationConfig;
private final ScheduledExecutorService scheduledExecutorService;
private final MasterGroupStatus masterGroupStatus = new MasterGroupStatus();
@@ -85,7 +85,7 @@ public class MetaDataManager implements Server {
public MetaDataManager(TMaster tMaster) {
this.tMaster = tMaster;
MasterConfig masterConfig = this.tMaster.getMasterConfig();
- this.replicationConfig = masterConfig.getReplicationConfig();
+ this.replicationConfig = masterConfig.getBdbMetaConfig();
this.metaStoreService =
new BdbMetaStoreServiceImpl(tMaster.getMasterConfig());
@@ -143,7 +143,7 @@ public class MetaDataManager implements Server {
logger.info("BrokerConfManager StoreService stopped");
}
- public void registerObserver(MetaConfigObserver eventObserver) {
+ public void registerObserver(ConfigObserver eventObserver) {
metaStoreService.registerObserver(eventObserver);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaConfigObserver.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/ConfigObserver.java
similarity index 93%
rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaConfigObserver.java
rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/ConfigObserver.java
index f428b51..5d2abff 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaConfigObserver.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/ConfigObserver.java
@@ -17,7 +17,7 @@
package org.apache.inlong.tubemq.server.master.metamanage.metastore;
-public interface MetaConfigObserver {
+public interface ConfigObserver {
void clearCacheData();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 1a3073c..9df88ab 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -277,7 +277,7 @@ public interface MetaStoreService extends KeepAliveService, Server {
boolean delGroupConsumeCtrlConf(String operator, String recordKey,
StringBuilder strBuff, ProcessResult result);
- void registerObserver(MetaConfigObserver eventObserver);
+ void registerObserver(ConfigObserver eventObserver);
boolean isTopicNameInUsed(String topicName);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index f987509..659161e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -23,7 +23,7 @@ import java.util.Set;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaConfigObserver;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.ConfigObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.KeepAliveService;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
@@ -41,7 +41,7 @@ public interface MetaConfigMapper extends KeepAliveService {
*
* @param eventObserver the event observer
*/
- void regMetaConfigObserver(MetaConfigObserver eventObserver);
+ void regMetaConfigObserver(ConfigObserver eventObserver);
boolean checkStoreStatus(boolean checkIsMaster, ProcessResult result);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 07fb9bb..d2f4ce3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -35,7 +35,7 @@ import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
import org.apache.inlong.tubemq.server.common.utils.RowLock;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaConfigObserver;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.ConfigObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.MetaConfigMapper;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
@@ -83,7 +83,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
// group consume control configure
protected ConsumeCtrlMapper consumeCtrlMapper;
// the observers focusing on active-standby switching
- private final List<MetaConfigObserver> eventObservers = new ArrayList<>();
+ private final List<ConfigObserver> eventObservers = new ArrayList<>();
public AbsMetaConfigMapperImpl(MasterConfig masterConfig) {
this.masterConfig = masterConfig;
@@ -92,7 +92,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
}
@Override
- public void regMetaConfigObserver(MetaConfigObserver eventObserver) {
+ public void regMetaConfigObserver(ConfigObserver eventObserver) {
if (eventObserver != null) {
eventObservers.add(eventObserver);
}
@@ -1210,7 +1210,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
*/
protected void reloadMetaStore(StringBuilder strBuff) {
// Clear observers' cache data.
- for (MetaConfigObserver observer : eventObservers) {
+ for (ConfigObserver observer : eventObservers) {
observer.clearCacheData();
}
// Load the latest meta-data from persistent
@@ -1221,7 +1221,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
groupResCtrlMapper.loadConfig(strBuff);
consumeCtrlMapper.loadConfig(strBuff);
// load the latest meta-data to observers
- for (MetaConfigObserver observer : eventObservers) {
+ for (ConfigObserver observer : eventObservers) {
observer.reloadCacheData();
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
index a0cd5f7..a7fb308 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
@@ -52,7 +52,7 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
-import org.apache.inlong.tubemq.server.common.fileconfig.MasterReplicationConfig;
+import org.apache.inlong.tubemq.server.common.fileconfig.BdbMetaConfig;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.bdbstore.MasterGroupStatus;
import org.apache.inlong.tubemq.server.master.bdbstore.MasterNodeInfo;
@@ -69,7 +69,8 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
LoggerFactory.getLogger(BdbMetaConfigMapperImpl.class);
private final MetaConfigSamplePrint metaSamplePrint =
new MetaConfigSamplePrint(logger);
-
+ // bdb meta store configure
+ private final BdbMetaConfig bdbMetaConfig;
// bdb environment configure
private final EnvironmentConfig envConfig;
// meta data store file
@@ -94,16 +95,15 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
public BdbMetaConfigMapperImpl(MasterConfig masterConfig) {
super(masterConfig);
- MasterReplicationConfig replicationConfig =
- masterConfig.getReplicationConfig();
+ bdbMetaConfig = masterConfig.getBdbMetaConfig();
// build replicationGroupAdmin info
Set<InetSocketAddress> helpers = new HashSet<>();
for (int i = 1; i <= 3; i++) {
helpers.add(new InetSocketAddress(this.masterConfig.getHostName(),
- replicationConfig.getRepNodePort() + i));
+ bdbMetaConfig.getRepNodePort() + i));
}
this.replicationGroupAdmin =
- new ReplicationGroupAdmin(replicationConfig.getRepGroupName(), helpers);
+ new ReplicationGroupAdmin(bdbMetaConfig.getRepGroupName(), helpers);
// Initialize configuration for BDB-JE replication environment.
// Set envHome and generate a ReplicationConfig. Note that ReplicationConfig and
// EnvironmentConfig values could all be specified in the je.properties file,
@@ -115,13 +115,13 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
// Wait up to 3 seconds for commitConsumed acknowledgments.
this.repConfig.setReplicaAckTimeout(3, TimeUnit.SECONDS);
this.repConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT, "1000");
- this.repConfig.setGroupName(replicationConfig.getRepGroupName());
- this.repConfig.setNodeName(replicationConfig.getRepNodeName());
+ this.repConfig.setGroupName(bdbMetaConfig.getRepGroupName());
+ this.repConfig.setNodeName(bdbMetaConfig.getRepNodeName());
this.repConfig.setNodeHostPort(this.masterConfig.getHostName() + TokenConstants.ATTR_SEP
- + replicationConfig.getRepNodePort());
- if (TStringUtils.isNotEmpty(replicationConfig.getRepHelperHost())) {
+ + bdbMetaConfig.getRepNodePort());
+ if (TStringUtils.isNotEmpty(bdbMetaConfig.getRepHelperHost())) {
logger.info("[BDB Impl] ADD HELP HOST");
- this.repConfig.setHelperHosts(replicationConfig.getRepHelperHost());
+ this.repConfig.setHelperHosts(bdbMetaConfig.getRepHelperHost());
}
// A replicated environment must be opened with transactions enabled.
// Environments on a master must be read/write, while environments
@@ -131,9 +131,9 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
this.envConfig = new EnvironmentConfig();
this.envConfig.setTransactional(true);
this.envConfig.setDurability(new Durability(
- replicationConfig.getMetaLocalSyncPolicy(),
- replicationConfig.getMetaReplicaSyncPolicy(),
- replicationConfig.getRepReplicaAckPolicy()));
+ bdbMetaConfig.getMetaLocalSyncPolicy(),
+ bdbMetaConfig.getMetaReplicaSyncPolicy(),
+ bdbMetaConfig.getRepReplicaAckPolicy()));
this.envConfig.setAllowCreate(true);
// Set transactional for the replicated environment.
this.storeConfig.setTransactional(true);
@@ -157,7 +157,8 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
}
executorService = Executors.newSingleThreadExecutor();
// build envHome file
- envHome = new File(masterConfig.getMetaDataPath());
+
+ envHome = new File(bdbMetaConfig.getMetaDataPath());
repEnv = getEnvironment();
initMetaStore(null);
repEnv.setStateChangeListener(listener);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
index 63313ef..b4e1d54 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
@@ -57,12 +57,12 @@ import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
-import org.apache.inlong.tubemq.server.common.fileconfig.MasterReplicationConfig;
+import org.apache.inlong.tubemq.server.common.fileconfig.BdbMetaConfig;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.bdbstore.MasterGroupStatus;
import org.apache.inlong.tubemq.server.master.bdbstore.MasterNodeInfo;
import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaConfigObserver;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.ConfigObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaStoreService;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
@@ -96,10 +96,10 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
// meta data store path
private final String metaDataPath;
// bdb replication configure
- private final MasterReplicationConfig replicationConfig;
+ private final BdbMetaConfig replicationConfig;
private final Listener listener = new Listener();
private ExecutorService executorService = null;
- private final List<MetaConfigObserver> eventObservers = new ArrayList<>();
+ private final List<ConfigObserver> eventObservers = new ArrayList<>();
// service status
// 0 stopped, 1 starting, 2 started, 3 stopping
private final AtomicInteger srvStatus = new AtomicInteger(0);
@@ -142,8 +142,8 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
public BdbMetaStoreServiceImpl(MasterConfig masterConfig) {
this.nodeHost = masterConfig.getHostName();
- this.metaDataPath = masterConfig.getMetaDataPath();
- this.replicationConfig = masterConfig.getReplicationConfig();
+ this.metaDataPath = masterConfig.getBdbMetaConfig().getMetaDataPath();
+ this.replicationConfig = masterConfig.getBdbMetaConfig();
// build replicationGroupAdmin info
Set<InetSocketAddress> helpers = new HashSet<>();
for (int i = 1; i <= 3; i++) {
@@ -839,7 +839,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
- public void registerObserver(MetaConfigObserver eventObserver) {
+ public void registerObserver(ConfigObserver eventObserver) {
if (eventObserver != null) {
eventObservers.add(eventObserver);
}
@@ -1143,7 +1143,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
*
* */
private void clearCachedRunData() {
- for (MetaConfigObserver observer : eventObservers) {
+ for (ConfigObserver observer : eventObservers) {
observer.clearCacheData();
}
}
@@ -1153,7 +1153,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
*
* */
private void reloadRunData() {
- for (MetaConfigObserver observer : eventObservers) {
+ for (ConfigObserver observer : eventObservers) {
observer.reloadCacheData();
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
index 82738af..b2dae4b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
@@ -81,7 +81,7 @@ public class ZKMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
super(masterConfig);
this.localNodeAdd = new NodeAddrInfo(
masterConfig.getHostName(), masterConfig.getPort());
- String tubeZkRoot = ZKUtil.normalizePath(masterConfig.getZkConfig().getZkNodeRoot());
+ String tubeZkRoot = ZKUtil.normalizePath(masterConfig.getZkMetaConfig().getZkNodeRoot());
StringBuilder strBuff =
new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
this.metaZkRoot = strBuff.append(tubeZkRoot).append(TokenConstants.SLASH)
@@ -111,7 +111,7 @@ public class ZKMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
logger.info("[ZK Impl] Starting MetaConfigService...");
// start Master select thread
executorService.scheduleWithFixedDelay(new MasterSelectorTask(), 5L,
- masterConfig.getZkConfig().getZkMasterCheckPeriodMs(), TimeUnit.MILLISECONDS);
+ masterConfig.getZkMetaConfig().getZkMasterCheckPeriodMs(), TimeUnit.MILLISECONDS);
// sleep 1 second for select
Thread.sleep(1000);
srvStatus.compareAndSet(1, 2);
@@ -139,7 +139,7 @@ public class ZKMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
closeMetaStore();
// set status
srvStatus.set(0);
- logger.info("[BDB Impl] Stop MetaConfigService, success");
+ logger.info("[ZK Impl] Stop MetaConfigService, success");
}
@Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index 3a4918f..175243d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -43,7 +43,7 @@ import org.apache.inlong.tubemq.server.common.utils.SerialIdUtils;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaConfigObserver;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.ConfigObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.inlong.tubemq.server.master.stats.MasterSrvStatsHolder;
import org.slf4j.Logger;
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
/*
* Broker run manager
*/
-public class DefBrokerRunManager implements BrokerRunManager, MetaConfigObserver {
+public class DefBrokerRunManager implements BrokerRunManager, ConfigObserver {
private static final Logger logger =
LoggerFactory.getLogger(DefBrokerRunManager.class);
// meta data manager
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterConfigTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterConfigTest.java
index 57e321a..7bdac57 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterConfigTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterConfigTest.java
@@ -18,8 +18,8 @@
package org.apache.inlong.tubemq.server.master;
import com.sleepycat.je.Durability;
-import org.apache.inlong.tubemq.server.common.fileconfig.MasterReplicationConfig;
-import org.apache.inlong.tubemq.server.common.fileconfig.ZKConfig;
+import org.apache.inlong.tubemq.server.common.fileconfig.BdbMetaConfig;
+import org.apache.inlong.tubemq.server.common.fileconfig.ZKMetaConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -30,6 +30,42 @@ public class MasterConfigTest {
}
@Test
+ public void testMetaBdbConfig() {
+ final MasterConfig masterConfig = new MasterConfig();
+ masterConfig.loadFromFile(this.getClass()
+ .getResource("/master-meta-bdb.ini").getPath());
+
+ Assert.assertEquals(masterConfig.getMetaDataPath(), "var/meta_data");
+ Assert.assertTrue(masterConfig.isUseBdbStoreMetaData());
+ final BdbMetaConfig repConfig = masterConfig.getBdbMetaConfig();
+ Assert.assertEquals("tubemqMasterGroup", repConfig.getRepGroupName());
+ Assert.assertEquals("tubemqMasterGroupNode1", repConfig.getRepNodeName());
+ Assert.assertEquals(9001, repConfig.getRepNodePort());
+ Assert.assertEquals("127.0.0.1:9001", repConfig.getRepHelperHost());
+ Assert.assertEquals(Durability.SyncPolicy.SYNC, repConfig.getMetaLocalSyncPolicy());
+ Assert.assertEquals(Durability.SyncPolicy.WRITE_NO_SYNC, repConfig.getMetaReplicaSyncPolicy());
+ Assert.assertEquals(Durability.ReplicaAckPolicy.SIMPLE_MAJORITY, repConfig.getRepReplicaAckPolicy());
+ Assert.assertEquals(10000, repConfig.getRepStatusCheckTimeoutMs());
+ }
+
+ @Test
+ public void testMetaZooKeeperConfig() {
+ final MasterConfig masterConfig = new MasterConfig();
+ masterConfig.loadFromFile(this.getClass()
+ .getResource("/master-meta-zk.ini").getPath());
+
+ Assert.assertFalse(masterConfig.isUseBdbStoreMetaData());
+ final ZKMetaConfig zkMetaConfig = masterConfig.getZkMetaConfig();
+ Assert.assertEquals("/tubemq", zkMetaConfig.getZkNodeRoot());
+ Assert.assertEquals("localhost:2181", zkMetaConfig.getZkServerAddr());
+ Assert.assertEquals(30000, zkMetaConfig.getZkSessionTimeoutMs());
+ Assert.assertEquals(30000, zkMetaConfig.getZkConnectionTimeoutMs());
+ Assert.assertEquals(5000, zkMetaConfig.getZkSyncTimeMs());
+ Assert.assertEquals(5000, zkMetaConfig.getZkCommitPeriodMs());
+ Assert.assertEquals(4000, zkMetaConfig.getZkMasterCheckPeriodMs());
+ }
+
+ @Test
public void testNormalConfig() {
final MasterConfig masterConfig = new MasterConfig();
masterConfig.loadFromFile(this.getClass().getResource("/master-normal.ini").getPath());
@@ -46,15 +82,7 @@ public class MasterConfigTest {
Assert.assertEquals("resources", masterConfig.getWebResourcePath());
Assert.assertEquals("var/meta_data_1", masterConfig.getMetaDataPath());
- final ZKConfig zkConfig = masterConfig.getZkConfig();
- Assert.assertEquals("/tubemq", zkConfig.getZkNodeRoot());
- Assert.assertEquals("localhost:2181", zkConfig.getZkServerAddr());
- Assert.assertEquals(30000, zkConfig.getZkSessionTimeoutMs());
- Assert.assertEquals(30000, zkConfig.getZkConnectionTimeoutMs());
- Assert.assertEquals(5000, zkConfig.getZkSyncTimeMs());
- Assert.assertEquals(5000, zkConfig.getZkCommitPeriodMs());
-
- final MasterReplicationConfig repConfig = masterConfig.getReplicationConfig();
+ final BdbMetaConfig repConfig = masterConfig.getBdbMetaConfig();
Assert.assertEquals("gp1", repConfig.getRepGroupName());
Assert.assertEquals("tubemqMasterGroupNode1", repConfig.getRepNodeName());
Assert.assertEquals(9999, repConfig.getRepNodePort());
@@ -73,7 +101,7 @@ public class MasterConfigTest {
Assert.assertEquals(masterConfig.getMetaDataPath(), "var/meta_data");
- final MasterReplicationConfig repConfig = masterConfig.getReplicationConfig();
+ final BdbMetaConfig repConfig = masterConfig.getBdbMetaConfig();
Assert.assertEquals("tubemqMasterGroup", repConfig.getRepGroupName());
Assert.assertEquals("tubemqMasterGroupNode1", repConfig.getRepNodeName());
Assert.assertEquals(9001, repConfig.getRepNodePort());
@@ -92,7 +120,7 @@ public class MasterConfigTest {
Assert.assertEquals("var/tubemqMasterGroup/master_data", masterConfig.getMetaDataPath());
- final MasterReplicationConfig repConfig = masterConfig.getReplicationConfig();
+ final BdbMetaConfig repConfig = masterConfig.getBdbMetaConfig();
Assert.assertEquals("gp1", repConfig.getRepGroupName());
Assert.assertEquals("tubemqMasterGroupNode1", repConfig.getRepNodeName());
Assert.assertEquals(9999, repConfig.getRepNodePort());
diff --git a/inlong-tubemq/tubemq-server/src/test/resources/master.ini b/inlong-tubemq/tubemq-server/src/test/resources/master-bdbstore.ini
similarity index 97%
copy from inlong-tubemq/tubemq-server/src/test/resources/master.ini
copy to inlong-tubemq/tubemq-server/src/test/resources/master-bdbstore.ini
index b78eea6..0587d8d 100644
--- a/inlong-tubemq/tubemq-server/src/test/resources/master.ini
+++ b/inlong-tubemq/tubemq-server/src/test/resources/master-bdbstore.ini
@@ -1,91 +1,91 @@
-;
-; Licensed to the Apache Software Foundation (ASF) under one or more
-; contributor license agreements. See the NOTICE file distributed with
-; this work for additional information regarding copyright ownership.
-; The ASF licenses this file to You under the Apache License, Version 2.0
-; (the "License"); you may not use this file except in compliance with
-; the License. You may obtain a copy of the License at
-;
-; http://www.apache.org/licenses/LICENSE-2.0
-;
-; Unless required by applicable law or agreed to in writing, software
-; distributed under the License is distributed on an "AS IS" BASIS,
-; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-; See the License for the specific language governing permissions and
-; limitations under the License.
-;
-
-[master]
-; host address of master, required; must be configured at network card, enabled,
-; non-loopback, and cannot be 127.0.0.1
-hostName=0.0.0.0
-; port that master listens to, optional; default is 8715
-port=8000
-; port that master web console listens to
-webPort=8080
-; interval of re-balance, optional; default is 30000ms
-consumerBalancePeriodMs=30000
-; delay of first re-balance after master start; turn up when cluster grown
-firstBalanceDelayAfterStartMs=60000
-; timeout of consumer heartbeat, optional; default is 30000ms
-consumerHeartbeatTimeoutMs=30000
-; timeout of producer heartbeat, optional; default is 45000ms
-producerHeartbeatTimeoutMs=45000
-; timeout of broker heartbeat, optional; default is 25000ms
-brokerHeartbeatTimeoutMs=25000
-;configure modify authorization_token
-confModAuthToken=abc
-webResourcePath=E:\\GIT\\TubeMQ\\resources
-
-[zookeeper]
-; root path of TubeMQ znodes on ZK
-zkNodeRoot=/tubemq
-; connect string of ZK servers
-zkServerAddr=localhost:2181
-; timeout of ZK heartbeat; default is 30000ms
-zkSessionTimeoutMs=30000
-; timeout of ZK connection; default is 30000ms
-zkConnectionTimeoutMs=30000
-; sync time on ZK; default is 5000ms
-zkSyncTimeMs=5000
-; interval to commits data on ZK; default is 5000ms
-zkCommitPeriodMs=5000
-
-[bdbStore]
-;name of Berkeley DB, Java Edition(BDB-JE) replication group
-bdbRepGroupName=tubemqMasterGroup
-;name of node in BDB-JE replication group
-bdbNodeName=tubemqMasterGroupNode1
-;port for node to communicate to other nodes in replication group
-bdbNodePort=9001
-;home directory of node in replication group
-bdbEnvHome=e:/GIT/TubeMQ/tubemqMasterGroup/master_data
-;helperHost(and port) for node to join replication group the first time
-bdbHelperHost=10.2.121.42:9001
-
-; config of commit file synchronization in BDB-JE
-; 1 for SYNC, will write and synchronously flush the log to disk upon transaction commit
-; 2 for NO_SYNC, do not synchronously flush the log upon transaction commit(if application or system fails,data may lost)
-; 3 for WRITE_NO_SYNC, synchronously written to the OS's file system buffers upon transaction commit, but the data is not
-; actually forced to disk(if the system fails,data may lost)
-; commit file synchronization configuration of master node in replication group
-bdbLocalSync= 1
-; commit file synchronization configuration of replica node in replication group
-bdbReplicaSync= 3
-
-; config of ReplicaAckPolicy in BDB-JE
-; 1 for SIMPLE_MAJORITY;
-; 2 for ALL
-; 3 for NONE
-bdbReplicaAck= 1
-
-;interval for node status check task
-bdbStatusCheckTimeoutMs=10000
-
-
-
-
-
-
-
-
+;
+; Licensed to the Apache Software Foundation (ASF) under one or more
+; contributor license agreements. See the NOTICE file distributed with
+; this work for additional information regarding copyright ownership.
+; The ASF licenses this file to You under the Apache License, Version 2.0
+; (the "License"); you may not use this file except in compliance with
+; the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+;
+
+[master]
+; host address of master, required; must be configured at network card, enabled,
+; non-loopback, and cannot be 127.0.0.1
+hostName=0.0.0.0
+; port that master listens to, optional; default is 8715
+port=8000
+; port that master web console listens to
+webPort=8080
+; interval of re-balance, optional; default is 30000ms
+consumerBalancePeriodMs=30000
+; delay of first re-balance after master start; turn up when cluster grown
+firstBalanceDelayAfterStartMs=60000
+; timeout of consumer heartbeat, optional; default is 30000ms
+consumerHeartbeatTimeoutMs=30000
+; timeout of producer heartbeat, optional; default is 45000ms
+producerHeartbeatTimeoutMs=45000
+; timeout of broker heartbeat, optional; default is 25000ms
+brokerHeartbeatTimeoutMs=25000
+;configure modify authorization_token
+confModAuthToken=abc
+webResourcePath=E:\\GIT\\TubeMQ\\resources
+
+[zookeeper]
+; root path of TubeMQ znodes on ZK
+zkNodeRoot=/tubemq
+; connect string of ZK servers
+zkServerAddr=localhost:2181
+; timeout of ZK heartbeat; default is 30000ms
+zkSessionTimeoutMs=30000
+; timeout of ZK connection; default is 30000ms
+zkConnectionTimeoutMs=30000
+; sync time on ZK; default is 5000ms
+zkSyncTimeMs=5000
+; interval to commits data on ZK; default is 5000ms
+zkCommitPeriodMs=5000
+
+[bdbStore]
+;name of Berkeley DB, Java Edition(BDB-JE) replication group
+bdbRepGroupName=tubemqMasterGroup
+;name of node in BDB-JE replication group
+bdbNodeName=tubemqMasterGroupNode1
+;port for node to communicate to other nodes in replication group
+bdbNodePort=9001
+;home directory of node in replication group
+bdbEnvHome=e:/GIT/TubeMQ/tubemqMasterGroup/master_data
+;helperHost(and port) for node to join replication group the first time
+bdbHelperHost=10.2.121.42:9001
+
+; config of commit file synchronization in BDB-JE
+; 1 for SYNC, will write and synchronously flush the log to disk upon transaction commit
+; 2 for NO_SYNC, do not synchronously flush the log upon transaction commit(if application or system fails,data may lost)
+; 3 for WRITE_NO_SYNC, synchronously written to the OS's file system buffers upon transaction commit, but the data is not
+; actually forced to disk(if the system fails,data may lost)
+; commit file synchronization configuration of master node in replication group
+bdbLocalSync= 1
+; commit file synchronization configuration of replica node in replication group
+bdbReplicaSync= 3
+
+; config of ReplicaAckPolicy in BDB-JE
+; 1 for SIMPLE_MAJORITY;
+; 2 for ALL
+; 3 for NONE
+bdbReplicaAck= 1
+
+;interval for node status check task
+bdbStatusCheckTimeoutMs=10000
+
+
+
+
+
+
+
+
diff --git a/inlong-tubemq/conf/master.ini b/inlong-tubemq/tubemq-server/src/test/resources/master-meta-bdb.ini
similarity index 87%
copy from inlong-tubemq/conf/master.ini
copy to inlong-tubemq/tubemq-server/src/test/resources/master-meta-bdb.ini
index b58a83d..1f1f1c0 100644
--- a/inlong-tubemq/conf/master.ini
+++ b/inlong-tubemq/tubemq-server/src/test/resources/master-meta-bdb.ini
@@ -1,94 +1,85 @@
-;
-; Licensed to the Apache Software Foundation (ASF) under one or more
-; contributor license agreements. See the NOTICE file distributed with
-; this work for additional information regarding copyright ownership.
-; The ASF licenses this file to You under the Apache License, Version 2.0
-; (the "License"); you may not use this file except in compliance with
-; the License. You may obtain a copy of the License at
-;
-; http://www.apache.org/licenses/LICENSE-2.0
-;
-; Unless required by applicable law or agreed to in writing, software
-; distributed under the License is distributed on an "AS IS" BASIS,
-; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-; See the License for the specific language governing permissions and
-; limitations under the License.
-;
-
-[master]
-; host address of master, required; must be configured at network card, enabled
-hostName=127.0.0.1
-; port that master listens to, optional; default is 8715
-port=8715
-; port that master web console listens to
-webPort=8080
-; interval of re-balance, optional; default is 30000ms
-consumerBalancePeriodMs=30000
-; delay of first re-balance after master start; turn up when cluster grown
-firstBalanceDelayAfterStartMs=60000
-; timeout of consumer heartbeat, optional; default is 30000ms
-consumerHeartbeatTimeoutMs=30000
-; timeout of producer heartbeat, optional; default is 45000ms
-producerHeartbeatTimeoutMs=45000
-; timeout of broker heartbeat, optional; default is 25000ms
-brokerHeartbeatTimeoutMs=25000
-; configure modify authorization_token
-confModAuthToken=abc
-webResourcePath=resources
-; configure useWebProxy
-useWebProxy=false
-
-; meta data path; can be absolute, or relative to TubeMQ base directory ($BASE_DIR)
-; optional, default is "var/meta_data"
-; should be the same to `[bdbStore].bdbEnvHome` if upgrade from version prior 0.5.0
-;metaDataPath=var/meta_data
-
-
-[zookeeper]
-; root path of TubeMQ znodes on ZK
-zkNodeRoot=/tubemq
-; connect string of ZK servers
-zkServerAddr=localhost:2181
-; timeout of ZK heartbeat; default is 30000ms
-zkSessionTimeoutMs=30000
-; timeout of ZK connection; default is 30000ms
-zkConnectionTimeoutMs=30000
-; sync time on ZK; default is 5000ms
-zkSyncTimeMs=5000
-; interval to commits data on ZK; default is 5000ms
-zkCommitPeriodMs=5000
-
-
-[replication]
-; name of replication group, default is `tubemqMasterGroup`, you'd better set individual value for every tubeMQ cluster
-;repGroupName=tubemqMasterGroup
-; name of current node; MUST BE DIFFERENT for every node in the same group
-repNodeName=tubemqMasterGroupNode1
-; port for node to communicate to other nodes in replication group, default is 9001
-;repNodePort=9001
-; helperHost(and port) for node to join master cluster and the port should keep consistent with `repNodePort`; for the
-; first time of starting, this value for every node in master cluster should keep same
-; the default is 127.0.0.1:9001
-;repHelperHost=masterHostName:9001
-
-; meta data disk sync policy
-; the overall durability is a function of metaLocalSyncPolicy plus the repReplicaAckPolicy used by the master,
-; and the metaReplicaSyncPolicy in effect at each Replica
-; see https://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.html for detail
-; 1 for SYNC, will write and synchronously flush the log to disk upon transaction commit
-; 2 for NO_SYNC, do not synchronously flush the log upon transaction commit (if application or system fails, data may lost)
-; 3 for WRITE_NO_SYNC, synchronously written to the OS's file system buffers upon transaction commit, but the data is not
-; actually forced to disk(if the system fails,data may lost)
-; sync policy for "local", optional; default is 1(SYNC)
-;metaLocalSyncPolicy=1
-; sync policy for "replica", optional; default is 3(WRITE_NO_SYNC)
-;metaReplicaSyncPolicy=3
-
-; replication acknowledge policy, optional; default is 1(SIMPLE_MAJORITY)
-; 1 for SIMPLE_MAJORITY
-; 2 for ALL
-; 3 for NONE
-;repReplicaAckPolicy=1
-
-; interval for node status check task, optional; default is 10000(ms)
-;repStatusCheckTimeoutMs=10000
+;
+; Licensed to the Apache Software Foundation (ASF) under one or more
+; contributor license agreements. See the NOTICE file distributed with
+; this work for additional information regarding copyright ownership.
+; The ASF licenses this file to You under the Apache License, Version 2.0
+; (the "License"); you may not use this file except in compliance with
+; the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+;
+
+[master]
+; host address of master, required; must be configured at network card, enabled
+hostName=127.0.0.1
+; port that master listens to, optional; default is 8715
+port=8715
+; port that master web console listens to
+webPort=8080
+; interval of re-balance, optional; default is 30000ms
+consumerBalancePeriodMs=30000
+; delay of first re-balance after master start; turn up when cluster grown
+firstBalanceDelayAfterStartMs=60000
+; timeout of consumer heartbeat, optional; default is 30000ms
+consumerHeartbeatTimeoutMs=30000
+; timeout of producer heartbeat, optional; default is 45000ms
+producerHeartbeatTimeoutMs=45000
+; timeout of broker heartbeat, optional; default is 25000ms
+brokerHeartbeatTimeoutMs=25000
+; configure modify authorization_token
+confModAuthToken=abc
+webResourcePath=resources
+; configure useWebProxy
+useWebProxy=false
+
+
+[meta_bdb]
+; name of replication group, default is `tubemqMasterGroup`, you'd better set individual value for every tubeMQ cluster
+;repGroupName=tubemqMasterGroup
+
+; name of current node; MUST BE DIFFERENT for every node in the same group
+repNodeName=tubemqMasterGroupNode1
+
+; port for node to communicate to other nodes in replication group, default is 9001
+;repNodePort=9001
+
+; bdb meta data path; can be absolute, or relative to TubeMQ base directory ($BASE_DIR)
+; optional, default is "var/meta_data"
+; should be the same to `[bdbStore].bdbEnvHome` if upgrade from version prior 0.5.0
+; or `[master].metaDataPath` if upgrade from version prior 1.0.0
+;metaDataPath=var/meta_data
+
+; helperHost(and port) for node to join master cluster and the port should keep consistent with `repNodePort`; for the
+; first time of starting, this value for every node in master cluster should keep same
+; the default is 127.0.0.1:9001
+;repHelperHost=masterHostName:9001
+
+; meta data disk sync policy
+; the overall durability is a function of metaLocalSyncPolicy plus the repReplicaAckPolicy used by the master,
+; and the metaReplicaSyncPolicy in effect at each Replica
+; see https://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.html for detail
+; 1 for SYNC, will write and synchronously flush the log to disk upon transaction commit
+; 2 for NO_SYNC, do not synchronously flush the log upon transaction commit (if application or system fails, data may lost)
+; 3 for WRITE_NO_SYNC, synchronously written to the OS's file system buffers upon transaction commit, but the data is not
+; actually forced to disk(if the system fails,data may lost)
+; sync policy for "local", optional; default is 1(SYNC)
+;metaLocalSyncPolicy=1
+
+; sync policy for "replica", optional; default is 3(WRITE_NO_SYNC)
+;metaReplicaSyncPolicy=3
+
+; replication acknowledge policy, optional; default is 1(SIMPLE_MAJORITY)
+; 1 for SIMPLE_MAJORITY
+; 2 for ALL
+; 3 for NONE
+;repReplicaAckPolicy=1
+
+; interval for node status check task, optional; default is 10000(ms)
+;repStatusCheckTimeoutMs=10000
+
diff --git a/inlong-tubemq/tubemq-server/src/test/resources/master.ini b/inlong-tubemq/tubemq-server/src/test/resources/master-meta-zk.ini
similarity index 55%
rename from inlong-tubemq/tubemq-server/src/test/resources/master.ini
rename to inlong-tubemq/tubemq-server/src/test/resources/master-meta-zk.ini
index b78eea6..7317527 100644
--- a/inlong-tubemq/tubemq-server/src/test/resources/master.ini
+++ b/inlong-tubemq/tubemq-server/src/test/resources/master-meta-zk.ini
@@ -1,91 +1,57 @@
-;
-; Licensed to the Apache Software Foundation (ASF) under one or more
-; contributor license agreements. See the NOTICE file distributed with
-; this work for additional information regarding copyright ownership.
-; The ASF licenses this file to You under the Apache License, Version 2.0
-; (the "License"); you may not use this file except in compliance with
-; the License. You may obtain a copy of the License at
-;
-; http://www.apache.org/licenses/LICENSE-2.0
-;
-; Unless required by applicable law or agreed to in writing, software
-; distributed under the License is distributed on an "AS IS" BASIS,
-; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-; See the License for the specific language governing permissions and
-; limitations under the License.
-;
-
-[master]
-; host address of master, required; must be configured at network card, enabled,
-; non-loopback, and cannot be 127.0.0.1
-hostName=0.0.0.0
-; port that master listens to, optional; default is 8715
-port=8000
-; port that master web console listens to
-webPort=8080
-; interval of re-balance, optional; default is 30000ms
-consumerBalancePeriodMs=30000
-; delay of first re-balance after master start; turn up when cluster grown
-firstBalanceDelayAfterStartMs=60000
-; timeout of consumer heartbeat, optional; default is 30000ms
-consumerHeartbeatTimeoutMs=30000
-; timeout of producer heartbeat, optional; default is 45000ms
-producerHeartbeatTimeoutMs=45000
-; timeout of broker heartbeat, optional; default is 25000ms
-brokerHeartbeatTimeoutMs=25000
-;configure modify authorization_token
-confModAuthToken=abc
-webResourcePath=E:\\GIT\\TubeMQ\\resources
-
-[zookeeper]
-; root path of TubeMQ znodes on ZK
-zkNodeRoot=/tubemq
-; connect string of ZK servers
-zkServerAddr=localhost:2181
-; timeout of ZK heartbeat; default is 30000ms
-zkSessionTimeoutMs=30000
-; timeout of ZK connection; default is 30000ms
-zkConnectionTimeoutMs=30000
-; sync time on ZK; default is 5000ms
-zkSyncTimeMs=5000
-; interval to commits data on ZK; default is 5000ms
-zkCommitPeriodMs=5000
-
-[bdbStore]
-;name of Berkeley DB, Java Edition(BDB-JE) replication group
-bdbRepGroupName=tubemqMasterGroup
-;name of node in BDB-JE replication group
-bdbNodeName=tubemqMasterGroupNode1
-;port for node to communicate to other nodes in replication group
-bdbNodePort=9001
-;home directory of node in replication group
-bdbEnvHome=e:/GIT/TubeMQ/tubemqMasterGroup/master_data
-;helperHost(and port) for node to join replication group the first time
-bdbHelperHost=10.2.121.42:9001
-
-; config of commit file synchronization in BDB-JE
-; 1 for SYNC, will write and synchronously flush the log to disk upon transaction commit
-; 2 for NO_SYNC, do not synchronously flush the log upon transaction commit(if application or system fails,data may lost)
-; 3 for WRITE_NO_SYNC, synchronously written to the OS's file system buffers upon transaction commit, but the data is not
-; actually forced to disk(if the system fails,data may lost)
-; commit file synchronization configuration of master node in replication group
-bdbLocalSync= 1
-; commit file synchronization configuration of replica node in replication group
-bdbReplicaSync= 3
-
-; config of ReplicaAckPolicy in BDB-JE
-; 1 for SIMPLE_MAJORITY;
-; 2 for ALL
-; 3 for NONE
-bdbReplicaAck= 1
-
-;interval for node status check task
-bdbStatusCheckTimeoutMs=10000
-
-
-
-
-
-
-
-
+;
+; Licensed to the Apache Software Foundation (ASF) under one or more
+; contributor license agreements. See the NOTICE file distributed with
+; this work for additional information regarding copyright ownership.
+; The ASF licenses this file to You under the Apache License, Version 2.0
+; (the "License"); you may not use this file except in compliance with
+; the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+;
+
+[master]
+; host address of master, required; must be configured at network card, enabled
+hostName=127.0.0.1
+; port that master listens to, optional; default is 8715
+port=8715
+; port that master web console listens to
+webPort=8080
+; interval of re-balance, optional; default is 30000ms
+consumerBalancePeriodMs=30000
+; delay of first re-balance after master start; turn up when cluster grown
+firstBalanceDelayAfterStartMs=60000
+; timeout of consumer heartbeat, optional; default is 30000ms
+consumerHeartbeatTimeoutMs=30000
+; timeout of producer heartbeat, optional; default is 45000ms
+producerHeartbeatTimeoutMs=45000
+; timeout of broker heartbeat, optional; default is 25000ms
+brokerHeartbeatTimeoutMs=25000
+; configure modify authorization_token
+confModAuthToken=abc
+webResourcePath=resources
+; configure useWebProxy
+useWebProxy=false
+
+[meta_zookeeper]
+; root path of TubeMQ znodes on ZK
+zkNodeRoot=/tubemq
+; connect string of ZK servers
+zkServerAddr=localhost:2181
+; timeout of ZK heartbeat; default is 30000ms
+zkSessionTimeoutMs=30000
+; timeout of ZK connection; default is 30000ms
+zkConnectionTimeoutMs=30000
+; sync time on ZK; default is 5000ms
+zkSyncTimeMs=5000
+; interval to commits data on ZK; default is 5000ms
+zkCommitPeriodMs=5000
+; Master Select status check duration, default is 5000ms
+zkMasterCheckPeriodMs=4000
+
+