You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/02 14:48:31 UTC
[incubator-inlong] branch master updated: [INLONG-2776][TubeMQ] Add metadata backup cli script (#2843)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a155b57 [INLONG-2776][TubeMQ] Add metadata backup cli script (#2843)
a155b57 is described below
commit a155b57c4f183cd041f65b13eab5d16f2a51d731
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Mar 2 22:47:46 2022 +0800
[INLONG-2776][TubeMQ] Add metadata backup cli script (#2843)
---
inlong-tubemq/bin/tubemq-metadata-bru.sh | 41 +
.../tubemq/corebase/metric/TrafficStatsUnit.java | 6 +
.../server/broker/msgstore/MessageStore.java | 26 +
.../tubemq/server/common/fielddef/CliArgDef.java | 13 +-
.../metastore/dao/entity/BaseEntity.java | 55 +-
.../metastore/dao/entity/BrokerConfEntity.java | 3 +-
.../metastore/dao/entity/ClusterSettingEntity.java | 49 +-
.../dao/entity/GroupConsumeCtrlEntity.java | 3 +-
.../metastore/dao/entity/GroupResCtrlEntity.java | 3 +-
.../metastore/dao/entity/TopicCtrlEntity.java | 3 +-
.../metastore/dao/entity/TopicDeployEntity.java | 3 +-
.../metastore/dao/entity/TopicPropGroup.java | 61 +
.../tubemq/server/tools/cli/CliMetaDataBRU.java | 1477 ++++++++++++++++++++
.../metastore/dao/entity/BaseEntityTest.java | 45 +
.../dao/entity/ClusterSettingEntityTest.java | 78 ++
.../metastore/dao/entity/TopicPropGroupTest.java | 50 +-
16 files changed, 1900 insertions(+), 16 deletions(-)
diff --git a/inlong-tubemq/bin/tubemq-metadata-bru.sh b/inlong-tubemq/bin/tubemq-metadata-bru.sh
new file mode 100755
index 0000000..fa264ad
--- /dev/null
+++ b/inlong-tubemq/bin/tubemq-metadata-bru.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#project directory
+if [ -z "$BASE_DIR" ] ; then
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+ BASE_DIR=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ BASE_DIR=`cd "$BASE_DIR" && pwd`
+ #echo "TubeMQ master is at $BASE_DIR"
+fi
+source $BASE_DIR/bin/env.sh
+$JAVA $TOOLS_ARGS org.apache.inlong.tubemq.server.tools.cli.CliMetaDataBRU $@
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
index 69443d2..344b682 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
@@ -58,6 +58,12 @@ public class TrafficStatsUnit {
this.msgSize.addValue(msgSize);
}
+ /**
+ * Get traffic information in json format.
+ *
+ * @param strBuff the string information container
+ * @param resetValue whether reset value
+ */
public void getValue(StringBuilder strBuff, boolean resetValue) {
if (!TStringUtils.isEmpty(this.trafficName)) {
strBuff.append("\"").append(this.trafficName).append("\":");
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 0dfc0a5..ffe4ecc 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -613,6 +613,12 @@ public class MessageStore implements Closeable {
return this.msgFileStore.getIndexMaxHighOffset();
}
+ /**
+ * Get the index max offset
+ * Read from cache settings if memory cache is enabled, otherwise directly from file store
+ *
+ * @return the current index offset
+ */
public long getIndexMaxOffset() {
long lastOffset = 0L;
if (tubeConfig.isEnableMemStore()) {
@@ -636,6 +642,12 @@ public class MessageStore implements Closeable {
return this.msgFileStore.getDataMinOffset();
}
+ /**
+ * Get the data max offset
+ * Read from cache settings if memory cache is enabled, otherwise directly from file store
+ *
+ * @return the current data offset
+ */
public long getDataMaxOffset() {
long lastOffset = 0L;
if (tubeConfig.isEnableMemStore()) {
@@ -651,6 +663,13 @@ public class MessageStore implements Closeable {
return lastOffset;
}
+ /**
+ * Get the index total size
+ * If memory cache is enabled, the data in the cache is counted first,
+ * and then the data in the file is counted
+ *
+ * @return the current index total size
+ */
public long getIndexStoreSize() {
long totalSize = 0L;
if (!tubeConfig.isEnableMemStore()) {
@@ -670,6 +689,13 @@ public class MessageStore implements Closeable {
return totalSize;
}
+ /**
+ * Get the data total size
+ * If memory cache is enabled, the data in the cache is counted first,
+ * and then the data in the file is counted
+ *
+ * @return the current data total size
+ */
public long getDataStoreSize() {
long totalSize = 0L;
if (!tubeConfig.isEnableMemStore()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/CliArgDef.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/CliArgDef.java
index 85c9772..f87336e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/CliArgDef.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/CliArgDef.java
@@ -101,7 +101,18 @@ public enum CliArgDef {
"Return http's methods."),
FILEPATH("f", "file",
"String: file path.",
- "File path.");
+ "File path."),
+ METAFILEPATH(null, "meta-file-path",
+ "String: backup or recovery file path.",
+ "File path to backup or restore metadata."
+ + " Defaults value is the current path where the program is running."),
+ OPERATIONTYPE(null, "operation-type",
+ "String: operation type, include [backup, recovery]",
+ "Types of operations on metadata"),
+ AUTHTOKEN(null, "auth-token",
+ "String: API operation authorization code",
+ "API operation authorization code,"
+ + " required when adding or modifying, optional when querying");
CliArgDef(String opt, String longOpt, String optDesc) {
this(opt, longOpt, false, "", optDesc);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index eda3af4..e287dad 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.Serializable;
import java.util.Date;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
@@ -269,6 +270,36 @@ public class BaseEntity implements Serializable, Cloneable {
return sBuilder;
}
+ /**
+ * Get field value to key and value format.
+ *
+ * @param paramMap build container
+ * @param isLongName if return field key is long name
+ */
+ public void getConfigureInfo(Map<String, String> paramMap,
+ boolean isLongName) {
+ if (dataVersionId != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "dataVersionId" : "dVerId"),
+ String.valueOf(dataVersionId));
+ }
+ if (serialId.get() != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "serialId" : "serialId"),
+ String.valueOf(serialId.get()));
+ }
+ if (TStringUtils.isNotBlank(createUser)) {
+ paramMap.put((isLongName ? "createUser" : "cur"), createUser);
+ }
+ if (TStringUtils.isNotBlank(createDateStr)) {
+ paramMap.put((isLongName ? "createDate" : "cDate"), createDateStr);
+ }
+ if (TStringUtils.isNotBlank(modifyUser)) {
+ paramMap.put((isLongName ? "modifyUser" : "mur"), modifyUser);
+ }
+ if (TStringUtils.isNotBlank(modifyDateStr)) {
+ paramMap.put((isLongName ? "modifyDate" : "mDate"), modifyDateStr);
+ }
+ }
+
private void setModifyDate(Date date) {
if (date == null) {
return;
@@ -285,6 +316,21 @@ public class BaseEntity implements Serializable, Cloneable {
this.createDateStr = DateTimeConvertUtils.date2yyyyMMddHHmmss(date);
}
+ /**
+ * Check if data field values are equal
+ *
+ * @param other check object
+ * @return if equals
+ */
+ public boolean isDataEquals(BaseEntity other) {
+ return dataVersionId == other.dataVersionId
+ && Objects.equals(createUser, other.createUser)
+ && Objects.equals(createDateStr, other.createDateStr)
+ && Objects.equals(modifyUser, other.modifyUser)
+ && Objects.equals(modifyDateStr, other.modifyDateStr);
+ // && Objects.equals(attributes, other.attributes);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -294,13 +340,8 @@ public class BaseEntity implements Serializable, Cloneable {
return false;
}
BaseEntity that = (BaseEntity) o;
- return dataVersionId == that.dataVersionId
- && serialId.get() == that.serialId.get()
- && Objects.equals(createUser, that.createUser)
- && Objects.equals(createDate, that.createDate)
- && Objects.equals(modifyUser, that.modifyUser)
- && Objects.equals(modifyDate, that.modifyDate)
- && Objects.equals(attributes, that.attributes);
+ return (serialId.get() == that.serialId.get())
+ && isDataEquals(that);
}
@Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 4ea3977..9a888b4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -408,7 +408,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
* @return if equals
*/
public boolean isDataEquals(BrokerConfEntity other) {
- return brokerId == other.brokerId
+ return super.isDataEquals(other)
+ && brokerId == other.brokerId
&& brokerPort == other.brokerPort
&& brokerTLSPort == other.brokerTLSPort
&& brokerWebPort == other.brokerWebPort
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index 636e255..ef624e1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -17,6 +17,7 @@
package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity;
+import java.util.Map;
import java.util.Objects;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils;
@@ -263,7 +264,8 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
* @return if equals
*/
public boolean isDataEquals(ClusterSettingEntity other) {
- return brokerPort == other.brokerPort
+ return super.isDataEquals(other)
+ && brokerPort == other.brokerPort
&& brokerTLSPort == other.brokerTLSPort
&& brokerWebPort == other.brokerWebPort
&& maxMsgSizeInB == other.maxMsgSizeInB
@@ -314,11 +316,54 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
}
/**
+ * Get field value to key and value format.
+ *
+ * @param paramMap output map
+ * @param isLongName if return field key is long name
+ */
+ public void getConfigureInfo(Map<String, String> paramMap,
+ boolean isLongName) {
+ if (brokerPort != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "brokerPort" : "bPort"),
+ String.valueOf(brokerPort));
+ }
+ if (brokerTLSPort != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "brokerTLSPort" : "bTlsPort"),
+ String.valueOf(brokerTLSPort));
+ }
+ if (brokerWebPort != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "brokerWebPort" : "bWebPort"),
+ String.valueOf(brokerWebPort));
+ }
+ if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "maxMsgSizeInMB" : "mxMsgInMB"),
+ String.valueOf(maxMsgSizeInMB));
+ }
+ if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "qryPriorityId" : "qryPriId"),
+ String.valueOf(qryPriorityId));
+ }
+ if (gloFlowCtrlStatus != EnableStatus.STATUS_UNDEFINE) {
+ paramMap.put((isLongName ? "flowCtrlEnable" : "fCtrlEn"),
+ String.valueOf(gloFlowCtrlStatus.isEnable()));
+ }
+ if (gloFlowCtrlRuleCnt != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "flowCtrlRuleCount" : "fCtrlCnt"),
+ String.valueOf(gloFlowCtrlRuleCnt));
+ }
+ if (TStringUtils.isNotBlank(gloFlowCtrlRuleInfo)) {
+ paramMap.put((isLongName ? "flowCtrlInfo" : "fCtrlInfo"), gloFlowCtrlRuleInfo);
+ }
+ clsDefTopicProps.getConfigureInfo(paramMap, isLongName);
+ super.getConfigureInfo(paramMap, isLongName);
+ }
+
+ /**
* Serialize field to old version json format
*
* @param sBuilder build container
* @param isLongName if return field key is long name
- * @return
+ * @return the build result
*/
public StringBuilder toOldVerFlowCtrlWebJsonStr(StringBuilder sBuilder,
boolean isLongName) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index 4638a18..3b7faa8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -283,7 +283,8 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
* @return if equals
*/
public boolean isDataEquals(GroupConsumeCtrlEntity other) {
- return recordKey.equals(other.recordKey)
+ return super.isDataEquals(other)
+ && recordKey.equals(other.recordKey)
&& Objects.equals(topicName, other.topicName)
&& Objects.equals(groupName, other.groupName)
&& consumeEnable == other.consumeEnable
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index 30b661f..61c75ae 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -343,7 +343,8 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
* @return if equals
*/
public boolean isDataEquals(GroupResCtrlEntity other) {
- return allowedBrokerClientRate == other.allowedBrokerClientRate
+ return super.isDataEquals(other)
+ && allowedBrokerClientRate == other.allowedBrokerClientRate
&& qryPriorityId == other.qryPriorityId
&& ruleCnt == other.ruleCnt
&& groupName.equals(other.groupName)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index a521ba0..077fbff 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -256,7 +256,8 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
* @return if equals
*/
public boolean isDataEquals(TopicCtrlEntity other) {
- return topicNameId == other.topicNameId
+ return super.isDataEquals(other)
+ && topicNameId == other.topicNameId
&& maxMsgSizeInB == other.maxMsgSizeInB
&& topicName.equals(other.topicName)
&& authCtrlStatus == other.authCtrlStatus;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index b438545..dcd328d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -321,7 +321,8 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
* @return if equals
*/
public boolean isDataEquals(TopicDeployEntity other) {
- return brokerId == other.brokerId
+ return super.isDataEquals(other)
+ && brokerId == other.brokerId
&& brokerPort == other.brokerPort
&& topicNameId == other.topicNameId
&& recordKey.equals(other.recordKey)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
index e9eee48..8e2c895 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
@@ -18,6 +18,7 @@
package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity;
import java.io.Serializable;
+import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
@@ -274,6 +275,66 @@ public class TopicPropGroup implements Serializable, Cloneable {
}
/**
+ * Get field value in key and value format
+ *
+ * @param paramMap the value container
+ * @param isLongName whether long field name
+ */
+ public void getConfigureInfo(Map<String, String> paramMap,
+ boolean isLongName) {
+ if (numTopicStores != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "numTopicStores" : "numStore"),
+ String.valueOf(numTopicStores));
+ }
+ if (numPartitions != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "numPartitions" : "numPart"),
+ String.valueOf(numPartitions));
+ }
+ if (unflushThreshold != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "unflushThreshold" : "unfDskMsgCnt"),
+ String.valueOf(unflushThreshold));
+ }
+ if (unflushInterval != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "unflushInterval" : "unfDskInt"),
+ String.valueOf(unflushInterval));
+ }
+ if (unflushDataHold != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "unflushDataHold" : "unfDskDataSz"),
+ String.valueOf(unflushDataHold));
+ }
+ if (memCacheMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "memCacheMsgSizeInMB" : "cacheInMB"),
+ String.valueOf(memCacheMsgSizeInMB));
+ }
+ if (memCacheMsgCntInK != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "memCacheMsgCntInK" : "unfMemMsgCnt"),
+ String.valueOf(memCacheMsgCntInK));
+ }
+ if (memCacheFlushIntvl != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "memCacheFlushIntvl" : "unfMemInt"),
+ String.valueOf(memCacheFlushIntvl));
+ }
+ if (acceptPublish != null) {
+ paramMap.put((isLongName ? "acceptPublish" : "accPub"),
+ String.valueOf(acceptPublish));
+ }
+ if (acceptSubscribe != null) {
+ paramMap.put((isLongName ? "acceptSubscribe" : "accSub"),
+ String.valueOf(acceptSubscribe));
+ }
+ if (TStringUtils.isNotBlank(deletePolicy)) {
+ paramMap.put((isLongName ? "deletePolicy" : "delPol"), deletePolicy);
+ }
+ if (dataStoreType != TBaseConstants.META_VALUE_UNDEFINED) {
+ paramMap.put((isLongName ? "dataStoreType" : "dStType"),
+ String.valueOf(dataStoreType));
+ }
+ if (TStringUtils.isNotBlank(dataPath)) {
+ paramMap.put((isLongName ? "dataPath" : "dPath"), dataPath);
+ }
+ }
+
+ /**
* fill fields with default value
*
* @return object
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
new file mode 100644
index 0000000..5eaf707
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
@@ -0,0 +1,1477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
+import org.apache.inlong.tubemq.server.common.fielddef.CliArgDef;
+import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.inlong.tubemq.server.common.utils.HttpUtils;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CliMetaDataBRU, metadata backup and recovery utility.
+ * The utility class for script #{bin/tubemq-metadata-bru.sh} to
+ * backup and recovery metadata from Masters.
+ *
+ */
+public class CliMetaDataBRU extends CliAbstractBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(CliMetaDataBRU.class);
+ private static final List<String> allowedOpTypeList = Arrays.asList("backup", "recovery");
+ private static final int maxDataLength = 150000;
+ private static final String curOperator = "SystemCliAdmin";
+ // cli parameters
+ private String masterServers = null;
+ private String operationType = null;
+ private String authToken = null;
+ private String backupAndRecoveryPath = "./meta_backup";
+ private File metaDataDir = null;
+ private MasterInfo masterInfo;
+
+ public CliMetaDataBRU() {
+ super("tubemq-metadata-bru.sh");
+ initCommandOptions();
+ }
+
+ /**
+ * Init command options
+ */
+ @Override
+ protected void initCommandOptions() {
+ // add the cli required parameters
+ addCommandOption(CliArgDef.MASTERSERVER);
+ addCommandOption(CliArgDef.OPERATIONTYPE);
+ addCommandOption(CliArgDef.METAFILEPATH);
+ addCommandOption(CliArgDef.AUTHTOKEN);
+ }
+
+ @Override
+ public boolean processParams(String[] args) throws Exception {
+ // parse parameters and check value
+ CommandLine cli = parser.parse(options, args);
+ if (cli == null) {
+ throw new org.apache.commons.cli.ParseException("Parse args failure");
+ }
+ if (cli.hasOption(CliArgDef.VERSION.longOpt)) {
+ version();
+ }
+ if (cli.hasOption(CliArgDef.HELP.longOpt)) {
+ help();
+ }
+ // get master-addresses
+ if (!cli.hasOption(CliArgDef.MASTERSERVER.longOpt)) {
+ throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!");
+ }
+ masterServers = cli.getOptionValue(CliArgDef.MASTERSERVER.longOpt);
+ if (TStringUtils.isBlank(masterServers)) {
+ throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is not allowed blank!");
+ }
+ masterInfo = new MasterInfo(masterServers.trim());
+ // get operation-type
+ if (!cli.hasOption(CliArgDef.OPERATIONTYPE.longOpt)) {
+ throw new Exception(CliArgDef.OPERATIONTYPE.longOpt + " is required!");
+ }
+ operationType = cli.getOptionValue(CliArgDef.OPERATIONTYPE.longOpt);
+ if (TStringUtils.isBlank(operationType)) {
+ throw new Exception(CliArgDef.OPERATIONTYPE.longOpt + " is not allowed blank!");
+ }
+ if (!allowedOpTypeList.contains(operationType)) {
+ throw new Exception(CliArgDef.OPERATIONTYPE.longOpt
+ + " only supports " + allowedOpTypeList.toString());
+ }
+ // get metadata backup and recovery path
+ if (cli.hasOption(CliArgDef.METAFILEPATH.longOpt)) {
+ backupAndRecoveryPath = cli.getOptionValue(CliArgDef.METAFILEPATH.longOpt);
+ if (TStringUtils.isBlank(backupAndRecoveryPath)) {
+ throw new Exception(CliArgDef.METAFILEPATH.longOpt + " is not allowed blank!");
+ }
+ }
+ // validate path directory
+ metaDataDir = new File(backupAndRecoveryPath);
+ if (!metaDataDir.exists()) {
+ if (operationType.equalsIgnoreCase("backup")) {
+ if (!metaDataDir.mkdirs()) {
+ throw new IOException(new StringBuilder(512)
+ .append("Creates the directory named ")
+ .append(metaDataDir.getAbsolutePath())
+ .append(" failure!").toString());
+ }
+ } else {
+ throw new RuntimeException(new StringBuilder(512)
+ .append("Path ").append(backupAndRecoveryPath)
+ .append(" is not existed!").toString());
+ }
+ }
+ if (!metaDataDir.isDirectory() || !metaDataDir.canRead()) {
+ throw new RuntimeException(new StringBuilder(512)
+ .append("Path ").append(backupAndRecoveryPath)
+ .append(" is not a readable directory!").toString());
+ }
+ // get auth-token code
+ if (operationType.equalsIgnoreCase("recovery")) {
+ authToken = cli.getOptionValue(CliArgDef.AUTHTOKEN.longOpt);
+ if (TStringUtils.isBlank(authToken)) {
+ throw new Exception(CliArgDef.AUTHTOKEN.longOpt + " is not allowed blank!");
+ }
+ }
+ return true;
+ }
+
+ public static void main(final String[] args) {
+ CliMetaDataBRU cliMetaDataBRU = new CliMetaDataBRU();
+ StringBuilder strBuff = new StringBuilder(512);
+ try {
+ boolean result = cliMetaDataBRU.processParams(args);
+ if (!result) {
+ throw new Exception("Parse parameters failure!");
+ }
+ cliMetaDataBRU.processCommands(strBuff);
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ logger.error(ex.getMessage());
+ cliMetaDataBRU.help();
+ }
+ }
+
+ public void processCommands(StringBuilder strBuff) {
+ if (operationType.equals("backup")) {
+ backupMetaData(strBuff);
+ } else {
+ recoveryMetaData(strBuff);
+ }
+ }
+
+ /**
+ * Backup meta data from Masters
+ * The Master currently has 6 types of metadata.
+ * First, query the metadata from the Master, then save it to the specified storage location,
+ * and finally read the saved result and compare it with the queried data to confirm that
+ * the saved content is consistent with the query result.
+ *
+ * @param strBuff the string buffer
+ */
+ private void backupMetaData(StringBuilder strBuff) {
+ logger.info("Backup meta-data begin: start query data from remote, total 6 items");
+ // a. cluster setting
+ logger.info("No 1.1-6: download cluster setting configurations from Master");
+ Map<String, ClusterSettingEntity> clusterSettingMap = getClusterConfInfo(strBuff);
+ if (clusterSettingMap == null) {
+ logger.error(" download cluster setting configurations failure!");
+ return;
+ }
+ logger.info("No 1.2-6: store cluster setting configurations to local file");
+ storeObjectToFile(clusterSettingMap, backupAndRecoveryPath, "clusterConfig");
+ logger.info("No 1.3-6: verify configurations ");
+ Map<String, ClusterSettingEntity> storedSettingMap =
+ (Map<String, ClusterSettingEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "clusterConfig");
+ if (storedSettingMap == null) {
+ logger.error(strBuff
+ .append(" local cluster setting configurations read failure!, clusterConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ if (storedSettingMap.size() != clusterSettingMap.size()) {
+ logger.error(" verify failure, stored clusterConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, ClusterSettingEntity> qryEntry : clusterSettingMap.entrySet()) {
+ ClusterSettingEntity targetEntity = storedSettingMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored clusterConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ }
+ // b. broker configurations
+ logger.info("No 2.1-6: download broker configurations from Master");
+ Map<String, BrokerConfEntity> brokerConfMap = getBrokerConfInfos(strBuff);
+ if (brokerConfMap == null) {
+ logger.error(" download broker configurations is null!");
+ return;
+ }
+ logger.info("No 2.2-6: store broker configurations to local file");
+ storeObjectToFile(brokerConfMap, backupAndRecoveryPath, "brokerConfig");
+ logger.info("No 2.3-6: verify configurations");
+ Map<String, BrokerConfEntity> storedBrokerConfigMap =
+ (Map<String, BrokerConfEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "brokerConfig");
+ if (storedBrokerConfigMap == null) {
+ logger.error(strBuff
+ .append(" local broker configurations read failure!, brokerConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ if (storedBrokerConfigMap.size() != brokerConfMap.size()) {
+ logger.error(" verify failure, stored brokerConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, BrokerConfEntity> qryEntry : brokerConfMap.entrySet()) {
+ BrokerConfEntity targetEntity = storedBrokerConfigMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored brokerConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // c. topic control configurations
+ logger.info("No 3.1-6: download topic control configurations from Master");
+ Map<String, TopicCtrlEntity> topicCtrlMap = getTopicControlInfos(strBuff);
+ if (topicCtrlMap == null) {
+ logger.error(" download topic control configure is null!");
+ return;
+ }
+ logger.info("No 3.2-6: store topic control configurations to local file");
+ storeObjectToFile(topicCtrlMap, backupAndRecoveryPath, "topicControlConfig");
+ logger.info("No 3.3-6: verify configurations");
+ Map<String, TopicCtrlEntity> storedTopicCtrlMap =
+ (Map<String, TopicCtrlEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "topicControlConfig");
+ if (storedTopicCtrlMap == null) {
+ logger.error(strBuff
+ .append(" local topic control configurations read failure!, topicControlConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ if (storedTopicCtrlMap.size() != topicCtrlMap.size()) {
+ logger.error(" verify failure, stored topicControlConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, TopicCtrlEntity> qryEntry : topicCtrlMap.entrySet()) {
+ TopicCtrlEntity targetEntity = storedTopicCtrlMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored topicControlConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // d. topic deploy configurations
+ logger.info("No 4.1-6: download topic deploy configurations from Master");
+ Map<String, TopicDeployEntity> topicDeployMap = getTopicDeployInfos(strBuff);
+ if (topicDeployMap == null) {
+ logger.error(" download topic deploy configurations is null!");
+ return;
+ }
+ logger.info("No 4.2-6: store topic deploy configurations to local file");
+ storeObjectToFile(topicDeployMap, backupAndRecoveryPath, "topicDeployConfig");
+ logger.info("No 4.3-6: verify configurations");
+ Map<String, TopicDeployEntity> storedTopicDeployMap =
+ (Map<String, TopicDeployEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "topicDeployConfig");
+ if (storedTopicDeployMap == null) {
+ logger.error(strBuff
+ .append(" local topic deploy configurations read failure!, topicControlConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ if (storedTopicDeployMap.size() != topicDeployMap.size()) {
+ logger.error(" verify failure, stored topicDeployConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, TopicDeployEntity> qryEntry : topicDeployMap.entrySet()) {
+ TopicDeployEntity targetEntity = storedTopicDeployMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored topicDeployConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // e. group control configurations
+ logger.info("No 5.1-6: download group control configurations from Master");
+ Map<String, GroupResCtrlEntity> groupCtrlMap = getGroupResCtrlInfos(strBuff);
+ if (groupCtrlMap == null) {
+ logger.error(" download group control configurations is null!");
+ return;
+ }
+ logger.info("No 5.2-6: store group control configurations to local file");
+ storeObjectToFile(groupCtrlMap, backupAndRecoveryPath, "groupCtrlConfig");
+ logger.info("No 5.3-6: verify configurations");
+ Map<String, GroupResCtrlEntity> storedGroupCtrlMap =
+ (Map<String, GroupResCtrlEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "groupCtrlConfig");
+ if (storedGroupCtrlMap == null) {
+ logger.error(strBuff
+ .append(" local group control configurations read failure!, groupCtrlConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ if (storedGroupCtrlMap.size() != groupCtrlMap.size()) {
+ logger.error(" verify failure, stored groupCtrlConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, GroupResCtrlEntity> qryEntry : groupCtrlMap.entrySet()) {
+ GroupResCtrlEntity targetEntity = storedGroupCtrlMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored groupCtrlConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // f. group consume control configurations
+ logger.info("No 6.1-6: download group consume configurations from Master");
+ Map<String, GroupConsumeCtrlEntity> groupCsmInfoMap = getGroupCsmCtrlInfos(strBuff);
+ if (groupCsmInfoMap == null) {
+ logger.error(" download group consume configurations is null!");
+ return;
+ }
+ logger.info("No 6.2-6: store group consume configurations to local file");
+ storeObjectToFile(groupCsmInfoMap, backupAndRecoveryPath, "groupConsumeConfig");
+ logger.info("No 6.3-6: verify configurations");
+ Map<String, GroupConsumeCtrlEntity> storedGroupCsmMap =
+ (Map<String, GroupConsumeCtrlEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "groupConsumeConfig");
+ if (storedGroupCsmMap == null) {
+ logger.error(strBuff
+ .append(" local group consume configurations read failure!, groupConsumeConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ if (storedGroupCsmMap.size() != groupCsmInfoMap.size()) {
+ logger.error(" verify failure, stored groupConsumeConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, GroupConsumeCtrlEntity> qryEntry : groupCsmInfoMap.entrySet()) {
+ GroupConsumeCtrlEntity targetEntity = storedGroupCsmMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored groupConsumeConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ logger.info("Backup finished!");
+ }
+
+ /**
+ * Recovery meta data to Masters
+ * For each type of metadata, they follow the same set of processing procedures:
+ * read metadata locally, write the data to the Master, read the written data from the Master,
+ * and then compare the data read from the Master and the locally stored data,
+ * determine whether the two are consistent
+ *
+ * @param strBuff the string buffer
+ */
+ private void recoveryMetaData(StringBuilder strBuff) {
+ logger.info("Recovery meta-data begin: start read data from local path");
+ // a. read cluster configurations
+ logger.info("No 1.1-6 recovery cluster configurations");
+ Map<String, ClusterSettingEntity> storedSettingMap =
+ (Map<String, ClusterSettingEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "clusterConfig");
+ if (storedSettingMap == null) {
+ logger.error(strBuff
+ .append(" local cluster setting configurations read failure!, clusterConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 1.2-6 upload cluster configurations to master");
+ ProcessResult result = new ProcessResult();
+ if (!writeClusterConfInfo(storedSettingMap, strBuff, result)) {
+ logger.error(strBuff.append(" write cluster configurations failure!")
+ .append(result.getErrMsg()).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 1.3-6 read restored cluster configurations from master");
+ Map<String, ClusterSettingEntity> clusterSettingMap = getClusterConfInfo(strBuff);
+ if (clusterSettingMap == null) {
+ logger.error(" read restored cluster setting configurations failure!");
+ return;
+ }
+ logger.info("No 1.4-6: verify configurations");
+ if (storedSettingMap.size() != clusterSettingMap.size()) {
+ logger.error(" verify failure, restored clusterConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, ClusterSettingEntity> qryEntry : storedSettingMap.entrySet()) {
+ ClusterSettingEntity targetEntity = clusterSettingMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored clusterConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // b. read cluster configurations
+ logger.info("No 2.1-6 recovery broker configurations");
+ Map<String, BrokerConfEntity> storedBrokerConfigMap =
+ (Map<String, BrokerConfEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "brokerConfig");
+ if (storedBrokerConfigMap == null) {
+ logger.error(strBuff
+ .append(" local broker configurations read failure!, brokerConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 2.2-6 upload broker configurations to master");
+ if (!writeBrokerConfInfo(storedBrokerConfigMap, strBuff, result)) {
+ logger.error(strBuff.append(" write broker configurations failure!")
+ .append(result.getErrMsg()).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 2.3-6 read restored broker configurations from master");
+ Map<String, BrokerConfEntity> brokerConfMap = getBrokerConfInfos(strBuff);
+ if (brokerConfMap == null) {
+ logger.error(" read restored broker configurations failure!");
+ return;
+ }
+ logger.info("No 2.4-6: verify configurations");
+ if (brokerConfMap.size() != storedBrokerConfigMap.size()) {
+ logger.error(" verify failure, restored brokerConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, BrokerConfEntity> qryEntry : storedBrokerConfigMap.entrySet()) {
+ BrokerConfEntity targetEntity = brokerConfMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored brokerConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // c. read topic control configurations
+ logger.info("No 3.1-6 recovery topic control configurations");
+ Map<String, TopicCtrlEntity> storedTopicCtrlMap =
+ (Map<String, TopicCtrlEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "topicControlConfig");
+ if (storedTopicCtrlMap == null) {
+ logger.error(strBuff
+ .append(" local topic control configurations read failure!, topicControlConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 3.2-6 upload topic control configurations to master");
+ if (!writeTopicCtrlInfo(storedTopicCtrlMap, strBuff, result)) {
+ logger.error(strBuff.append(" write topic control configurations failure!")
+ .append(result.getErrMsg()).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 3.3-6 read restored topic control configurations from master");
+ Map<String, TopicCtrlEntity> topicCtrlMap = getTopicControlInfos(strBuff);
+ if (topicCtrlMap == null) {
+ logger.error(" download topic control configure is null!");
+ return;
+ }
+ logger.info("No 3.4-6: verify configurations");
+ if (topicCtrlMap.size() != storedTopicCtrlMap.size()) {
+ logger.error(" verify failure, restored topicControlConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, TopicCtrlEntity> qryEntry : storedTopicCtrlMap.entrySet()) {
+ TopicCtrlEntity targetEntity = topicCtrlMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored topicControlConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // d. read topic deploy configurations
+ logger.info("No 4.1-6 recovery topic deploy configurations");
+ Map<String, TopicDeployEntity> storedTopicDeployMap =
+ (Map<String, TopicDeployEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "topicDeployConfig");
+ if (storedTopicDeployMap == null) {
+ logger.error(strBuff
+ .append(" local topic deploy configurations read failure!, topicDeployConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 4.2-6 upload topic deploy configurations to master");
+ if (!writeTopicDeployInfo(storedTopicDeployMap, strBuff, result)) {
+ logger.error(strBuff.append(" write topic deploy configurations failure!")
+ .append(result.getErrMsg()).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 4.3-6 read restored topic deploy configurations from master");
+ Map<String, TopicDeployEntity> topicDeployMap = getTopicDeployInfos(strBuff);
+ if (topicDeployMap == null) {
+ logger.error(" download topic deploy configurations is null!");
+ return;
+ }
+ logger.info("No 4.4-6: verify configurations");
+ if (topicDeployMap.size() != storedTopicDeployMap.size()) {
+ logger.error(" verify failure, stored topicDeployConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, TopicDeployEntity> qryEntry : storedTopicDeployMap.entrySet()) {
+ TopicDeployEntity targetEntity = topicDeployMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored topicDeployConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // e. group control configurations
+ logger.info("No 5.1-6 recovery group control configurations");
+ Map<String, GroupResCtrlEntity> storedGroupCtrlMap =
+ (Map<String, GroupResCtrlEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "groupCtrlConfig");
+ if (storedGroupCtrlMap == null) {
+ logger.error(strBuff
+ .append(" local group control configurations read failure!, groupCtrlConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 5.2-6 upload group control configurations to master");
+ if (!writeGroupResCtrlInfo(storedGroupCtrlMap, strBuff, result)) {
+ logger.error(strBuff.append(" write group control configurations failure!")
+ .append(result.getErrMsg()).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 5.3-6 read restored group control configurations from master");
+ Map<String, GroupResCtrlEntity> groupCtrlMap = getGroupResCtrlInfos(strBuff);
+ if (groupCtrlMap == null) {
+ logger.error(" download group control configurations is null!");
+ return;
+ }
+ logger.info("No 5.4-6: verify configurations");
+ if (groupCtrlMap.size() != storedGroupCtrlMap.size()) {
+ logger.error(" verify failure, stored groupCtrlConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, GroupResCtrlEntity> qryEntry : storedGroupCtrlMap.entrySet()) {
+ GroupResCtrlEntity targetEntity = groupCtrlMap.get(qryEntry.getKey());
+ if (targetEntity == null || !targetEntity.isDataEquals(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored groupCtrlConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+ return;
+ }
+ }
+ // f. group consume control configurations
+ logger.info("No 6.1-6 recovery group consume control configurations");
+ Map<String, GroupConsumeCtrlEntity> storedGroupCsmMap =
+ (Map<String, GroupConsumeCtrlEntity>) readObjectFromFile(
+ backupAndRecoveryPath, "groupConsumeConfig");
+ if (storedGroupCsmMap == null) {
+ logger.error(strBuff
+ .append(" local group consume configurations read failure!, groupConsumeConfig in ")
+ .append(backupAndRecoveryPath).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 6.2-6 upload group consume control configurations to master");
+ if (!writeGroupCsmInfo(storedGroupCsmMap, strBuff, result)) {
+ logger.error(strBuff.append(" write group consume control configurations failure!")
+ .append(result.getErrMsg()).toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ }
+ logger.info("No 6.3-6 read restored group consume control configurations from master");
+ Map<String, GroupConsumeCtrlEntity> groupCsmInfoMap = getGroupCsmCtrlInfos(strBuff);
+ if (groupCsmInfoMap == null) {
+ logger.error(" download group consume configurations is null!");
+ return;
+ }
+ logger.info("No 6.4-6: verify configurations");
+ if (groupCsmInfoMap.size() != storedGroupCsmMap.size()) {
+ logger.error(" verify failure, stored groupConsumeConfig size not equal!");
+ return;
+ }
+ for (Map.Entry<String, GroupConsumeCtrlEntity> qryEntry : storedGroupCsmMap.entrySet()) {
+ GroupConsumeCtrlEntity targetEntity = groupCsmInfoMap.get(qryEntry.getKey());
+ if (targetEntity == null
+ || !targetEntity.isDataEquals(qryEntry.getValue())
+ || !targetEntity.isMatched(qryEntry.getValue())) {
+ logger.error(strBuff
+ .append(" verify failure, stored groupCtrlConfig value not equal!")
+ .append(" data in server is ").append(qryEntry.getValue().toString())
+ .append(", data stored is ").append((targetEntity == null)
+ ? null : targetEntity.toString()).toString());
+
+ logger.error(" verify failure, stored groupConsumeConfig value not equal!");
+ return;
+ }
+ }
+ logger.info("Recovery finished!");
+
+ }
+
+ /**
+ * Query cluster setting configurations.
+ *
+ * @param strBuff the string buffer
+ * @return the query result, null if query failure
+ */
+ private Map<String, ClusterSettingEntity> getClusterConfInfo(StringBuilder strBuff) {
+ // http://127.0.0.1:8080/webapi.htm?method=admin_query_cluster_default_setting
+ JsonObject jsonRes = qryDataFromMaster(
+ "admin_query_cluster_default_setting", new HashMap<>(), strBuff);
+ // check return result
+ if (!jsonRes.get("result").getAsBoolean()) {
+ logger.info(strBuff.append("Query cluster configurations info failure:")
+ .append(jsonRes.get("result").getAsString()).toString());
+ strBuff.delete(0, strBuff.length());
+ return null;
+ }
+ Map<String, ClusterSettingEntity> clusterSettingMap = new HashMap<>();
+ JsonArray clusterInfoList = jsonRes.get("data").getAsJsonArray();
+ for (int i = 0; i < clusterInfoList.size(); i++) {
+ JsonObject jsonItem = clusterInfoList.get(i).getAsJsonObject();
+ if (jsonItem == null) {
+ continue;
+ }
+ try {
+ // get base information
+ BaseEntity baseEntity = getBaseEntityInfo(jsonItem);
+ // get topic default configurations
+ TopicPropGroup defTopicProps = getTopicProps(jsonItem);
+ // get broker default configurations
+ int brokerPort = jsonItem.get("brokerPort").getAsInt();
+ int brokerTlsPort = jsonItem.get("brokerTLSPort").getAsInt();
+ int brokerWebPort = jsonItem.get("brokerWebPort").getAsInt();
+ int maxMsgSizeInMB = jsonItem.get("maxMsgSizeInMB").getAsInt();
+ int qryPriorityId = jsonItem.get("qryPriorityId").getAsInt();
+ boolean flowCtrlEnable = jsonItem.get("flowCtrlEnable").getAsBoolean();
+ int flowRuleCnt = jsonItem.get("flowCtrlRuleCount").getAsInt();
+ JsonArray flowCtrlInfoArray = jsonItem.get("flowCtrlInfo").getAsJsonArray();
+ String flowCtrlInfoStr = flowCtrlInfoArray.toString();
+ // build cluster setting entity
+ ClusterSettingEntity settingEntity =
+ new ClusterSettingEntity(baseEntity);
+ settingEntity.updModifyInfo(baseEntity.getDataVerId(),
+ brokerPort, brokerTlsPort, brokerWebPort, maxMsgSizeInMB,
+ qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfoStr, defTopicProps);
+ clusterSettingMap.put(settingEntity.getRecordKey(), settingEntity);
+ } catch (Throwable e) {
+ logger.error(strBuff.append("Parse cluster configurations(")
+ .append(jsonItem.toString()).append(") throw exception ")
+ .append(e.toString()).toString());
+ strBuff.delete(0, strBuff.length());
+ throw e;
+ }
+ }
+ return clusterSettingMap;
+ }
+
+ /**
+ * Write cluster setting configurations to Master
+ *
+ * @param clusterConfMap the cluster configures that needs to be stored
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return the process result
+ */
+ private boolean writeClusterConfInfo(Map<String, ClusterSettingEntity> clusterConfMap,
+ StringBuilder strBuff, ProcessResult result) {
+ if (clusterConfMap.isEmpty()) {
+ return true;
+ }
+ int count = 0;
+ Map<String, String> inParamMap = new HashMap<>();
+ for (ClusterSettingEntity entity : clusterConfMap.values()) {
+ // build cluster setting configurations
+ entity.getConfigureInfo(inParamMap, true);
+ if (!writeDataToMaster("admin_set_cluster_default_setting",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ inParamMap.clear();
+ }
+ return true;
+ }
+
+ /**
+ * Query broker configurations
+ *
+ * @param strBuff the string buffer
+ * @return the query result, null if query failure
+ */
+ private Map<String, BrokerConfEntity> getBrokerConfInfos(StringBuilder strBuff) {
+ // http://127.0.0.1:8080/webapi.htm?method=admin_query_broker_configure
+ JsonObject jsonRes = qryDataFromMaster(
+ "admin_query_broker_configure", new HashMap<>(), strBuff);
+ // check return result
+ if (!jsonRes.get("result").getAsBoolean()) {
+ logger.info(strBuff.append("Query broker configurations info failure:")
+ .append(jsonRes.get("result").getAsString()).toString());
+ strBuff.delete(0, strBuff.length());
+ return null;
+ }
+ Map<String, BrokerConfEntity> brokerInfoMap = new HashMap<>();
+ JsonArray jsonBrokerInfoList = jsonRes.get("data").getAsJsonArray();
+ for (int i = 0; i < jsonBrokerInfoList.size(); i++) {
+ JsonObject jsonItem = jsonBrokerInfoList.get(i).getAsJsonObject();
+ if (jsonItem == null) {
+ continue;
+ }
+ try {
+ // get base information
+ BaseEntity baseEntity = getBaseEntityInfo(jsonItem);
+ // get broker configurations
+ int brokerId = jsonItem.get("brokerId").getAsInt();
+ String brokerIp = jsonItem.get("brokerIp").getAsString();
+ BrokerConfEntity brokerConfEntity =
+ new BrokerConfEntity(baseEntity, brokerId, brokerIp);
+ // get topic configurations
+ TopicPropGroup topicPropGroup = getTopicProps(jsonItem);
+ // get broker configurations
+ int brokerPort = jsonItem.get("brokerPort").getAsInt();
+ int brokerTlsPort = jsonItem.get("brokerTLSPort").getAsInt();
+ int brokerWebPort = jsonItem.get("brokerWebPort").getAsInt();
+ int regionId = jsonItem.get("regionId").getAsInt();
+ int groupId = jsonItem.get("groupId").getAsInt();
+ String statusInfo = jsonItem.get("manageStatus").getAsString();
+ ManageStatus mngStatus = ManageStatus.descOf(statusInfo);
+ // build broker configurations
+ brokerConfEntity.updModifyInfo(baseEntity.getDataVerId(),
+ brokerPort, brokerTlsPort, brokerWebPort, regionId,
+ groupId, mngStatus, topicPropGroup);
+ brokerInfoMap.put(brokerIp, brokerConfEntity);
+ } catch (Throwable e) {
+ logger.error(strBuff.append("Parse broker configurations(")
+ .append(jsonItem.toString()).append(") throw exception ")
+ .append(e.toString()).toString());
+ strBuff.delete(0, strBuff.length());
+ throw e;
+ }
+ }
+ return brokerInfoMap;
+ }
+
+ /**
+ * Write broker configurations to Master
+ *
+ * @param brokerConfigMap the broker configurations that needs to be stored
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return the process result
+ */
+ private boolean writeBrokerConfInfo(Map<String, BrokerConfEntity> brokerConfigMap,
+ StringBuilder strBuff, ProcessResult result) {
+ if (brokerConfigMap.isEmpty()) {
+ return true;
+ }
+ int count = 0;
+ Map<String, String> inParamMap = new HashMap<>();
+ for (BrokerConfEntity entity : brokerConfigMap.values()) {
+ if (count++ > 0) {
+ strBuff.append(",");
+ }
+ // build broker configurations in json format
+ entity.toWebJsonStr(strBuff, false, false, true, true);
+ if (strBuff.length() > maxDataLength
+ || count % TServerConstants.CFG_BATCH_BROKER_OPERATE_MAX_COUNT == 0) {
+ inParamMap.put("brokerJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_broker_configure",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ count = 0;
+ inParamMap.clear();
+ strBuff.delete(0, strBuff.length());
+ }
+ }
+ if (strBuff.length() > 0) {
+ inParamMap.put("brokerJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_broker_configure",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ strBuff.delete(0, strBuff.length());
+ }
+ return true;
+ }
+
+ /**
+ * Query topic control configurations
+ *
+ * @param strBuff the string buffer
+ * @return the query result, null if query failure
+ */
+ private Map<String, TopicCtrlEntity> getTopicControlInfos(StringBuilder strBuff) {
+ // http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_control_info
+ JsonObject jsonRes = qryDataFromMaster(
+ "admin_query_topic_control_info", new HashMap<>(), strBuff);
+ // check return result
+ if (!jsonRes.get("result").getAsBoolean()) {
+ logger.info(strBuff.append("Query topic control configurations info failure:")
+ .append(jsonRes.get("result").getAsString()).toString());
+ strBuff.delete(0, strBuff.length());
+ return null;
+ }
+ Map<String, TopicCtrlEntity> topicCtrlMap = new HashMap<>();
+ JsonArray jsonTopicCtrlList = jsonRes.get("data").getAsJsonArray();
+ for (int i = 0; i < jsonTopicCtrlList.size(); i++) {
+ JsonObject jsonItem = jsonTopicCtrlList.get(i).getAsJsonObject();
+ if (jsonItem == null) {
+ continue;
+ }
+ try {
+ // get base information
+ BaseEntity baseEntity = getBaseEntityInfo(jsonItem);
+ // get topic control configurations
+ String topicName = jsonItem.get("topicName").getAsString();
+ int topicNameId = jsonItem.get("topicNameId").getAsInt();
+ boolean enableAuthCtrl = jsonItem.get("enableAuthControl").getAsBoolean();
+ int maxMsgSizeInMB = jsonItem.get("maxMsgSizeInMB").getAsInt();
+ // build topic control entity
+ TopicCtrlEntity topicCtrlEntity = new TopicCtrlEntity(baseEntity, topicName);
+ topicCtrlEntity.updModifyInfo(baseEntity.getDataVerId(),
+ topicNameId, maxMsgSizeInMB, enableAuthCtrl);
+ topicCtrlMap.put(topicCtrlEntity.getTopicName(), topicCtrlEntity);
+ } catch (Throwable e) {
+ logger.error(strBuff.append("Parse topic control configurations(")
+ .append(jsonItem.toString()).append(") throw exception ")
+ .append(e.toString()).toString());
+ strBuff.delete(0, strBuff.length());
+ throw e;
+ }
+ }
+ return topicCtrlMap;
+ }
+
+ /**
+ * Write topic control configurations to Master
+ *
+ * @param topicCtrlMap the topic control configurations that needs to be stored
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return the process result
+ */
+ private boolean writeTopicCtrlInfo(Map<String, TopicCtrlEntity> topicCtrlMap,
+ StringBuilder strBuff, ProcessResult result) {
+ if (topicCtrlMap.isEmpty()) {
+ return true;
+ }
+ int count = 0;
+ Map<String, String> inParamMap = new HashMap<>();
+ for (TopicCtrlEntity entity : topicCtrlMap.values()) {
+ if (count++ > 0) {
+ strBuff.append(",");
+ }
+ // build topic control configurations in json format
+ entity.toWebJsonStr(strBuff, true, true);
+ if (strBuff.length() > maxDataLength
+ || count % TServerConstants.CFG_BATCH_BROKER_OPERATE_MAX_COUNT == 0) {
+ inParamMap.put("topicCtrlJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_topic_control_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ count = 0;
+ inParamMap.clear();
+ strBuff.delete(0, strBuff.length());
+ }
+ }
+ if (strBuff.length() > 0) {
+ inParamMap.put("topicCtrlJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_topic_control_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ strBuff.delete(0, strBuff.length());
+ }
+ return true;
+ }
+
+ /**
+ * Query topic deploy configurations
+ *
+ * @param strBuff the string buffer
+ * @return the query result, null if query failure
+ */
+ private Map<String, TopicDeployEntity> getTopicDeployInfos(StringBuilder strBuff) {
+ // http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_deploy_info
+ JsonObject jsonRes = qryDataFromMaster(
+ "admin_query_topic_deploy_info", new HashMap<>(), strBuff);
+ // check return result
+ if (!jsonRes.get("result").getAsBoolean()) {
+ logger.info(strBuff.append("Query topic deploy configurations info failure:")
+ .append(jsonRes.get("result").getAsString()).toString());
+ strBuff.delete(0, strBuff.length());
+ return null;
+ }
+ Map<String, TopicDeployEntity> topicDeployMap = new HashMap<>();
+ JsonArray jsonTopicDeployList = jsonRes.get("data").getAsJsonArray();
+ for (int i = 0; i < jsonTopicDeployList.size(); i++) {
+ JsonObject jsonItem = jsonTopicDeployList.get(i).getAsJsonObject();
+ if (jsonItem == null) {
+ continue;
+ }
+ try {
+ // get base information
+ BaseEntity baseEntity = getBaseEntityInfo(jsonItem);
+ // get topic configurations
+ TopicPropGroup topicPropGroup = getTopicProps(jsonItem);
+ // get topic deploy configurations
+ String topicName = jsonItem.get("topicName").getAsString();
+ int brokerId = jsonItem.get("brokerId").getAsInt();
+ int topicNameId = jsonItem.get("topicNameId").getAsInt();
+ String brokerIp = jsonItem.get("brokerIp").getAsString();
+ int brokerPort = jsonItem.get("brokerPort").getAsInt();
+ int topicStatusId = jsonItem.get("topicStatusId").getAsInt();
+ TopicStatus deployStatus = TopicStatus.valueOf(topicStatusId);
+ // build topic deploy entity
+ TopicDeployEntity topicDeployEntity =
+ new TopicDeployEntity(baseEntity, brokerId, topicName);
+ topicDeployEntity.updModifyInfo(baseEntity.getDataVerId(),
+ topicNameId, brokerPort, brokerIp, deployStatus,
+ topicPropGroup);
+ topicDeployMap.put(topicDeployEntity.getRecordKey(), topicDeployEntity);
+ } catch (Throwable e) {
+ logger.error(strBuff.append("Parse topic deploy configurations(")
+ .append(jsonItem.toString()).append(") throw exception ")
+ .append(e.toString()).toString());
+ strBuff.delete(0, strBuff.length());
+ throw e;
+ }
+ }
+ return topicDeployMap;
+ }
+
+ /**
+ * Write topic deploy configurations to Master
+ *
+ * @param topicDeployMap the topic deploy configurations that needs to be stored
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return the process result
+ */
+ private boolean writeTopicDeployInfo(Map<String, TopicDeployEntity> topicDeployMap,
+ StringBuilder strBuff, ProcessResult result) {
+ if (topicDeployMap.isEmpty()) {
+ return true;
+ }
+ int count = 0;
+ Map<String, String> inParamMap = new HashMap<>();
+ for (TopicDeployEntity entity : topicDeployMap.values()) {
+ if (count++ > 0) {
+ strBuff.append(",");
+ }
+ // build topic deploy configurations in json format
+ entity.toWebJsonStr(strBuff, true, true);
+ if (strBuff.length() > maxDataLength
+ || count % TServerConstants.CFG_BATCH_BROKER_OPERATE_MAX_COUNT == 0) {
+ inParamMap.put("topicJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_bath_add_topic_deploy_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ count = 0;
+ inParamMap.clear();
+ strBuff.delete(0, strBuff.length());
+ }
+ }
+ if (strBuff.length() > 0) {
+ inParamMap.put("topicJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_bath_add_topic_deploy_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ strBuff.delete(0, strBuff.length());
+ }
+ return true;
+ }
+
+ /**
+ * Query group resource control configurations
+ *
+ * @param strBuff the string buffer
+ * @return the query result, null if query failure
+ */
+ private Map<String, GroupResCtrlEntity> getGroupResCtrlInfos(StringBuilder strBuff) {
+ // http://127.0.0.1:8080/webapi.htm?method=admin_query_group_resctrl_info
+ JsonObject jsonRes = qryDataFromMaster(
+ "admin_query_group_resctrl_info", new HashMap<>(), strBuff);
+ // check return result
+ if (!jsonRes.get("result").getAsBoolean()) {
+ logger.info(strBuff.append("Query group resource control configurations info failure:")
+ .append(jsonRes.get("result").getAsString()).toString());
+ strBuff.delete(0, strBuff.length());
+ return null;
+ }
+ Map<String, GroupResCtrlEntity> groupResCtrlMap = new HashMap<>();
+ JsonArray jsonGroupResCtrlList = jsonRes.get("data").getAsJsonArray();
+ for (int i = 0; i < jsonGroupResCtrlList.size(); i++) {
+ JsonObject jsonItem = jsonGroupResCtrlList.get(i).getAsJsonObject();
+ if (jsonItem == null) {
+ continue;
+ }
+ try {
+ // get base information
+ BaseEntity baseEntity = getBaseEntityInfo(jsonItem);
+ // get group resource control configurations
+ String groupName = jsonItem.get("groupName").getAsString();
+ boolean resCheckEnable = jsonItem.get("resCheckEnable").getAsBoolean();
+ int alwdBCRate = jsonItem.get("alwdBrokerClientRate").getAsInt();
+ int qryPriorityId = jsonItem.get("qryPriorityId").getAsInt();
+ boolean flowCtrlEnable = jsonItem.get("flowCtrlEnable").getAsBoolean();
+ int flowCtrlRuleCount = jsonItem.get("flowCtrlRuleCount").getAsInt();
+ JsonArray flowCtrlInfoArray = jsonItem.get("flowCtrlInfo").getAsJsonArray();
+ String flowCtrlInfoStr = flowCtrlInfoArray.toString();
+ // build group resource control entity
+ GroupResCtrlEntity groupResCtrlEntity =
+ new GroupResCtrlEntity(baseEntity, groupName);
+ groupResCtrlEntity.updModifyInfo(baseEntity.getDataVerId(),
+ resCheckEnable, alwdBCRate, qryPriorityId,
+ flowCtrlEnable, flowCtrlRuleCount, flowCtrlInfoStr);
+ groupResCtrlMap.put(groupResCtrlEntity.getGroupName(), groupResCtrlEntity);
+ } catch (Throwable e) {
+ logger.error(strBuff.append("Parse group resource control configurations(")
+ .append(jsonItem.toString()).append(") throw exception ")
+ .append(e.toString()).toString());
+ strBuff.delete(0, strBuff.length());
+ throw e;
+ }
+ }
+ return groupResCtrlMap;
+ }
+
+ /**
+ * Write group resource control configurations to Master
+ *
+ * @param groupResCtrlMap the group resource control configurations that needs to be stored
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return the process result
+ */
+ private boolean writeGroupResCtrlInfo(Map<String, GroupResCtrlEntity> groupResCtrlMap,
+ StringBuilder strBuff, ProcessResult result) {
+ if (groupResCtrlMap.isEmpty()) {
+ return true;
+ }
+ int count = 0;
+ Map<String, String> inParamMap = new HashMap<>();
+ for (GroupResCtrlEntity entity : groupResCtrlMap.values()) {
+ if (count++ > 0) {
+ strBuff.append(",");
+ }
+ // build group resource control configurations in json format
+ entity.toWebJsonStr(strBuff, true, true);
+ if (strBuff.length() > maxDataLength
+ || count % TServerConstants.CFG_BATCH_BROKER_OPERATE_MAX_COUNT == 0) {
+ inParamMap.put("groupResCtrlJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_group_resctrl_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ count = 0;
+ inParamMap.clear();
+ strBuff.delete(0, strBuff.length());
+ }
+ }
+ if (strBuff.length() > 0) {
+ inParamMap.put("groupResCtrlJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_group_resctrl_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ strBuff.delete(0, strBuff.length());
+ }
+ return true;
+ }
+
+ /**
+ * Query group consume control configurations
+ *
+ * @param strBuff the string buffer
+ * @return the query result, null if query failure
+ */
+ private Map<String, GroupConsumeCtrlEntity> getGroupCsmCtrlInfos(StringBuilder strBuff) {
+ // http://127.0.0.1:8080/webapi.htm?method=admin_query_group_csmctrl_info
+ JsonObject jsonRes = qryDataFromMaster(
+ "admin_query_group_csmctrl_info", new HashMap<>(), strBuff);
+ // check return result
+ if (!jsonRes.get("result").getAsBoolean()) {
+ logger.info(strBuff.append("Query group consume control configurations info failure:")
+ .append(jsonRes.get("result").getAsString()).toString());
+ strBuff.delete(0, strBuff.length());
+ return null;
+ }
+ Map<String, GroupConsumeCtrlEntity> groupCsmCtrlMap = new HashMap<>();
+ JsonArray jsonGroupCsmCtrlList = jsonRes.get("data").getAsJsonArray();
+ for (int i = 0; i < jsonGroupCsmCtrlList.size(); i++) {
+ JsonObject jsonItem = jsonGroupCsmCtrlList.get(i).getAsJsonObject();
+ if (jsonItem == null) {
+ continue;
+ }
+ try {
+ // get base information
+ BaseEntity baseEntity = getBaseEntityInfo(jsonItem);
+ // get group consume control configurations
+ String groupName = jsonItem.get("groupName").getAsString();
+ String topicName = jsonItem.get("topicName").getAsString();
+ boolean consumeEnable = jsonItem.get("consumeEnable").getAsBoolean();
+ String disableCsmRsn = jsonItem.get("disableCsmRsn").getAsString();
+ boolean filterEnable = jsonItem.get("filterEnable").getAsBoolean();
+ String filterConds = jsonItem.get("filterConds").getAsString();
+ // build group consume control entity
+ GroupConsumeCtrlEntity groupCsmCtrlEntity =
+ new GroupConsumeCtrlEntity(baseEntity, groupName, topicName);
+ groupCsmCtrlEntity.updModifyInfo(baseEntity.getDataVerId(),
+ consumeEnable, disableCsmRsn, filterEnable, filterConds);
+ groupCsmCtrlMap.put(groupCsmCtrlEntity.getRecordKey(), groupCsmCtrlEntity);
+ } catch (Throwable e) {
+ logger.error(strBuff.append("Parse group consume control configurations(")
+ .append(jsonItem.toString()).append(") throw exception ")
+ .append(e.toString()).toString());
+ strBuff.delete(0, strBuff.length());
+ throw e;
+ }
+ }
+ return groupCsmCtrlMap;
+ }
+
+ /**
+ * Write group consume control configurations to Master
+ *
+ * @param groupCsmMap the group consume control configurations that needs to be stored
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return the process result
+ */
+ private boolean writeGroupCsmInfo(Map<String, GroupConsumeCtrlEntity> groupCsmMap,
+ StringBuilder strBuff, ProcessResult result) {
+ if (groupCsmMap.isEmpty()) {
+ return true;
+ }
+ int count = 0;
+ Map<String, String> inParamMap = new HashMap<>();
+ for (GroupConsumeCtrlEntity entity : groupCsmMap.values()) {
+ if (count++ > 0) {
+ strBuff.append(",");
+ }
+ // build group consume control configurations in json format
+ entity.toWebJsonStr(strBuff, true, true);
+ if (strBuff.length() > maxDataLength
+ || count % TServerConstants.CFG_BATCH_BROKER_OPERATE_MAX_COUNT == 0) {
+ inParamMap.put("groupCsmJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_group_csmctrl_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ count = 0;
+ inParamMap.clear();
+ strBuff.delete(0, strBuff.length());
+ }
+ }
+ if (strBuff.length() > 0) {
+ inParamMap.put("groupCsmJsonSet", "[" + strBuff.toString() + "]");
+ strBuff.delete(0, strBuff.length());
+ inParamMap.put("createUser", curOperator);
+ inParamMap.put("modifyUser", curOperator);
+ if (!writeDataToMaster("admin_batch_add_group_csmctrl_info",
+ authToken, inParamMap, strBuff, result)) {
+ return false;
+ }
+ strBuff.delete(0, strBuff.length());
+ }
+ return true;
+ }
+
+ /**
+ * Get BaseEntity information from JsonObject
+ *
+ * @param jsonItem the json object
+ * @return the BaseEntity object
+ */
+ private BaseEntity getBaseEntityInfo(JsonObject jsonItem) {
+ long dataVersionId = jsonItem.get("dataVersionId").getAsInt();
+ String createUser = jsonItem.get("createUser").getAsString();
+ String createDateStr = jsonItem.get("createDate").getAsString();
+ Date createDate = DateTimeConvertUtils.yyyyMMddHHmmss2date(createDateStr);
+ String modifyUser = jsonItem.get("modifyUser").getAsString();
+ String modifyDateStr = jsonItem.get("modifyDate").getAsString();
+ Date modifyDate = DateTimeConvertUtils.yyyyMMddHHmmss2date(modifyDateStr);
+ return new BaseEntity(dataVersionId, createUser, createDate, modifyUser, modifyDate);
+ }
+
+ /**
+ * Get TopicPropGroup information from JsonObject
+ *
+ * @param jsonItem the json object
+ * @return the TopicPropGroup object
+ */
+ private TopicPropGroup getTopicProps(JsonObject jsonItem) {
+ int numTopicStores = jsonItem.get("numTopicStores").getAsInt();
+ int numPartitions = jsonItem.get("numPartitions").getAsInt();
+ int unflushThreshold = jsonItem.get("unflushThreshold").getAsInt();
+ int unflushInterval = jsonItem.get("unflushInterval").getAsInt();
+ int unflushDataHold = jsonItem.get("unflushDataHold").getAsInt();
+ int memCacheMsgSizeInMB = jsonItem.get("memCacheMsgSizeInMB").getAsInt();
+ int memCacheMsgCntInK = jsonItem.get("memCacheMsgCntInK").getAsInt();
+ int memCacheFlushIntvl = jsonItem.get("memCacheFlushIntvl").getAsInt();
+ boolean acceptPublish = jsonItem.get("acceptPublish").getAsBoolean();
+ boolean acceptSubscribe = jsonItem.get("acceptSubscribe").getAsBoolean();
+ String deletePolicy = jsonItem.get("deletePolicy").getAsString();
+ int dataStoreType = jsonItem.get("dataStoreType").getAsInt();
+ String dataPath = jsonItem.get("dataPath").getAsString();
+ return new TopicPropGroup(numTopicStores, numPartitions,
+ unflushThreshold, unflushInterval, unflushDataHold,
+ memCacheMsgSizeInMB, memCacheMsgCntInK, memCacheFlushIntvl,
+ acceptPublish, acceptSubscribe, deletePolicy, dataStoreType, dataPath);
+ }
+
+ /**
+ * Query data from Masters
+ *
+ * @param method the method name
+ * @param inParamMap the parameter map
+ * @param strBuff the string buffer
+ * @return the query result of Json format, null if failure
+ */
+ private JsonObject qryDataFromMaster(String method,
+ Map<String, String> inParamMap,
+ StringBuilder strBuff) {
+ String visitUrl;
+ JsonObject jsonRes = null;
+ // call master nodes
+ for (String address : masterInfo.getNodeHostPortList()) {
+ visitUrl = strBuff.append("http://").append(address)
+ .append("/webapi.htm?method=").append(method).toString();
+ strBuff.delete(0, strBuff.length());
+ try {
+ jsonRes = HttpUtils.requestWebService(visitUrl, inParamMap);
+ if (jsonRes != null) {
+ // if get data, break cycle
+ break;
+ }
+ } catch (Throwable e) {
+ //
+ }
+ }
+ return jsonRes;
+ }
+
+ /**
+ * Write data to Masters
+ *
+ * @param method the method name
+ * @param opCode the operation code
+ * @param inParamMap the parameter map
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return success or failure
+ */
+ private boolean writeDataToMaster(String method, String opCode,
+ Map<String, String> inParamMap,
+ StringBuilder strBuff,
+ ProcessResult result) {
+ String visitUrl;
+ JsonObject jsonRes = null;
+ // call master nodes
+ for (String address : masterInfo.getNodeHostPortList()) {
+ visitUrl = strBuff.append("http://").append(address)
+ .append("/webapi.htm?method=").append(method)
+ .append("&confModAuthToken=").append(opCode).toString();
+ strBuff.delete(0, strBuff.length());
+ try {
+ jsonRes = HttpUtils.requestWebService(visitUrl, inParamMap);
+ if (jsonRes != null) {
+ // if get data, break cycle
+ break;
+ }
+ } catch (Throwable e) {
+ //
+ }
+ }
+ if (jsonRes == null) {
+ result.setFailResult(400, "Connect master failure!");
+ } else {
+ if (jsonRes.get("result").getAsBoolean()) {
+ result.setSuccResult();
+ } else {
+ result.setFailResult(jsonRes.get("errCode").getAsInt(),
+ jsonRes.get("errMsg").getAsString());
+ }
+ }
+ return result.isSuccess();
+ }
+
+ /**
+ * Read object data from specified file path
+ *
+ * @param configPath the stored file path
+ * @param fileName the stored file name
+ * @return the stored object
+ */
+ private Object readObjectFromFile(String configPath, String fileName) {
+ FileInputStream fis = null;
+ ObjectInputStream is = null;
+ try {
+ File file = null;
+ if (configPath.lastIndexOf(File.separator) != configPath.length() - 1) {
+ file = new File(configPath + File.separator + fileName + ".conf");
+ } else {
+ file = new File(configPath + fileName + ".conf");
+ }
+ if (file.exists()) {
+ fis = new FileInputStream(file);
+ is = new ObjectInputStream(fis);
+ Object entry = is.readObject();
+ fis.close();
+ return entry;
+ } else {
+ return null;
+ }
+ } catch (Throwable e1) {
+ logger.error("store " + fileName + " exception", e1);
+ return null;
+ } finally {
+ if (fis != null) {
+ try {
+ fis.close();
+ } catch (Throwable e2) {
+ //
+ }
+ }
+ }
+ }
+
+ /**
+ * Write object data to specified file path
+ *
+ * @param storeObj the object to be stored
+ * @param configPath the stored file path
+ * @param fileName the stored file name
+ */
+ private void storeObjectToFile(Object storeObj, String configPath, String fileName) {
+ FileOutputStream fos = null;
+ ObjectOutputStream p = null;
+ try {
+ File file = null;
+ if (configPath.lastIndexOf(File.separator) != configPath.length() - 1) {
+ file = new File(configPath + File.separator + fileName + ".conf");
+ } else {
+ file = new File(configPath + fileName + ".conf");
+ }
+ logger.info("Store address is " + file.getAbsolutePath());
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdir();
+ }
+ if (!file.exists()) {
+ file.createNewFile();
+ }
+ fos = new FileOutputStream(file);
+ p = new ObjectOutputStream(fos);
+ p.writeObject(storeObj);
+ p.flush();
+ } catch (Throwable e) {
+ logger.warn("store " + fileName + " exception", e);
+ } finally {
+ if (fos != null) {
+ try {
+ fos.close();
+ } catch (Throwable e2) {
+ //
+ }
+ }
+ }
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java
index c55038c..342b2b8 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java
@@ -18,6 +18,8 @@
package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
import org.apache.inlong.tubemq.server.common.TServerConstants;
@@ -152,4 +154,47 @@ public class BaseEntityTest {
Assert.assertTrue(baseEntity8.isMatched(baseEntity9));
}
+ @Test
+ public void getMapValuesTest() {
+ Map<String, String> paramMap = new HashMap<>();
+ // case 1
+ BaseEntity entity1 = new BaseEntity();
+ entity1.getConfigureInfo(paramMap, true);
+ Assert.assertEquals(paramMap.size(), 0);
+ paramMap.clear();
+ // case 2
+ String createUser = "creater";
+ String dateStr1 = "20220302110925";
+ Date createDate = DateTimeConvertUtils.yyyyMMddHHmmss2date(dateStr1);
+ BaseEntity entity2 = new BaseEntity(createUser, createDate);
+ entity2.getConfigureInfo(paramMap, true);
+ Assert.assertEquals(paramMap.size(), 6);
+ Assert.assertEquals(paramMap.get("dataVersionId"),
+ String.valueOf(entity2.getDataVerId()));
+ Assert.assertEquals(paramMap.get("serialId"),
+ String.valueOf(entity2.getSerialId()));
+ Assert.assertEquals(paramMap.get("createUser"), createUser);
+ Assert.assertEquals(paramMap.get("createDate"), entity2.getCreateDateStr());
+ Assert.assertEquals(paramMap.get("modifyUser"), createUser);
+ Assert.assertEquals(paramMap.get("modifyDate"), entity2.getModifyDateStr());
+ paramMap.clear();
+ // case 3
+ String modifyUser = "modifyUser";
+ String dateStr2 = "20220302111925";
+ Date modifyDate = DateTimeConvertUtils.yyyyMMddHHmmss2date(dateStr2);
+ BaseEntity entity3 = new BaseEntity(55,
+ createUser, createDate, modifyUser, modifyDate);
+ entity3.getConfigureInfo(paramMap, false);
+ Assert.assertEquals(paramMap.size(), 6);
+ Assert.assertEquals(paramMap.get("dVerId"),
+ String.valueOf(entity3.getDataVerId()));
+ Assert.assertEquals(paramMap.get("serialId"),
+ String.valueOf(entity3.getSerialId()));
+ Assert.assertEquals(paramMap.get("cur"), createUser);
+ Assert.assertEquals(paramMap.get("cDate"), entity3.getCreateDateStr());
+ Assert.assertEquals(paramMap.get("mur"), entity3.getModifyUser());
+ Assert.assertEquals(paramMap.get("mDate"), entity3.getModifyDateStr());
+ paramMap.clear();
+ }
+
}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntityTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntityTest.java
index d780790..4b7d123 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntityTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntityTest.java
@@ -18,7 +18,10 @@
package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
@@ -180,4 +183,79 @@ public class ClusterSettingEntityTest {
Assert.assertNotEquals(setting5.getClsDefTopicProps(), setting2.getClsDefTopicProps());
}
+ @Test
+ public void getMapValuesTest() {
+ Map<String, String> paramMap = new HashMap<>();
+ // case 1
+ ClusterSettingEntity entity1 = new ClusterSettingEntity();
+ entity1.getConfigureInfo(paramMap, true);
+ Assert.assertEquals(paramMap.size(), 0);
+ paramMap.clear();
+ // case 2
+ String createUser = "creater";
+ String dateStr1 = "20220302110925";
+ Date createDate = DateTimeConvertUtils.yyyyMMddHHmmss2date(dateStr1);
+ String modifyUser = "modifyUser";
+ String dateStr2 = "20220302111925";
+ Date modifyDate = DateTimeConvertUtils.yyyyMMddHHmmss2date(dateStr2);
+ BaseEntity baseEntity = new BaseEntity(30,
+ createUser, createDate, modifyUser, modifyDate);
+ ClusterSettingEntity entity2 = new ClusterSettingEntity(baseEntity);
+ entity2.getConfigureInfo(paramMap, true);
+ Assert.assertEquals(paramMap.size(), 6);
+ Assert.assertEquals(paramMap.get("dataVersionId"),
+ String.valueOf(30));
+ Assert.assertEquals(paramMap.get("serialId"),
+ String.valueOf(entity2.getSerialId()));
+ Assert.assertEquals(paramMap.get("createUser"), createUser);
+ Assert.assertEquals(paramMap.get("createDate"), dateStr1);
+ Assert.assertEquals(paramMap.get("modifyUser"), modifyUser);
+ Assert.assertEquals(paramMap.get("modifyDate"), dateStr2);
+ paramMap.clear();
+ // case 3
+ entity2.updModifyInfo(5, 6, 7,
+ 8, 9, 101, false,
+ 0, null, null);
+ entity2.getConfigureInfo(paramMap, true);
+ Assert.assertEquals(paramMap.size(), 12);
+ Assert.assertEquals(paramMap.get("dataVersionId"),
+ String.valueOf(5));
+ Assert.assertEquals(paramMap.get("serialId"),
+ String.valueOf(entity2.getSerialId()));
+ Assert.assertEquals(paramMap.get("createUser"), createUser);
+ Assert.assertEquals(paramMap.get("createDate"), dateStr1);
+ Assert.assertEquals(paramMap.get("modifyUser"), modifyUser);
+ Assert.assertEquals(paramMap.get("modifyDate"), dateStr2);
+ Assert.assertEquals(paramMap.get("brokerPort"), String.valueOf(6));
+ Assert.assertEquals(paramMap.get("brokerTLSPort"), String.valueOf(7));
+ Assert.assertEquals(paramMap.get("brokerWebPort"), String.valueOf(8));
+ Assert.assertEquals(paramMap.get("maxMsgSizeInMB"), String.valueOf(9));
+ Assert.assertEquals(paramMap.get("qryPriorityId"), String.valueOf(101));
+ Assert.assertEquals(paramMap.get("flowCtrlEnable"), "false");
+ paramMap.clear();
+ // case 4
+ entity2.updModifyInfo(9, 10, 11,
+ 12, 13, 301, true,
+ 2, "[{\"type\":1},{\"type\":2}]", null);
+ entity2.getConfigureInfo(paramMap, false);
+ Assert.assertEquals(paramMap.size(), 14);
+ Assert.assertEquals(paramMap.get("dVerId"),
+ String.valueOf(9));
+ Assert.assertEquals(paramMap.get("serialId"),
+ String.valueOf(entity2.getSerialId()));
+ Assert.assertEquals(paramMap.get("cur"), createUser);
+ Assert.assertEquals(paramMap.get("cDate"), dateStr1);
+ Assert.assertEquals(paramMap.get("mur"), modifyUser);
+ Assert.assertEquals(paramMap.get("mDate"), dateStr2);
+
+ Assert.assertEquals(paramMap.get("bPort"), String.valueOf(10));
+ Assert.assertEquals(paramMap.get("bTlsPort"), String.valueOf(11));
+ Assert.assertEquals(paramMap.get("bWebPort"), String.valueOf(12));
+ Assert.assertEquals(paramMap.get("mxMsgInMB"), String.valueOf(13));
+ Assert.assertEquals(paramMap.get("qryPriId"), String.valueOf(301));
+ Assert.assertEquals(paramMap.get("fCtrlEn"), "true");
+ Assert.assertEquals(paramMap.get("fCtrlCnt"), String.valueOf(2));
+ Assert.assertEquals(paramMap.get("fCtrlInfo"), "[{\"type\":1},{\"type\":2}]");
+ paramMap.clear();
+ }
}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroupTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroupTest.java
index 894f2a6..1325ee0 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroupTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroupTest.java
@@ -21,6 +21,8 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.server.common.statusdef.CleanPolType;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
public class TopicPropGroupTest {
@@ -147,7 +149,53 @@ public class TopicPropGroupTest {
Assert.assertEquals(propsCase3.getRetPeriodInMs(), 10 * 3600 * 1000);
Assert.assertFalse(propsCase3.isMatched(propsCase2));
Assert.assertTrue(propsCase2.updModifyInfo(propsCase3));
-
}
+ @Test
+ public void getMapValuesTest() {
+ Map<String, String> paramMap = new HashMap<>();
+ // case 1
+ TopicPropGroup entity1 = new TopicPropGroup();
+ entity1.getConfigureInfo(paramMap, true);
+ Assert.assertEquals(paramMap.size(), 0);
+ paramMap.clear();
+ // case 2
+ TopicPropGroup entity2 = new TopicPropGroup(1, 2,
+ 3, 4, 5, 6,
+ 7, 8, true, false,
+ "delete,22h", 9, "aaa");
+ entity2.getConfigureInfo(paramMap, true);
+ Assert.assertEquals(paramMap.size(), 13);
+ Assert.assertEquals(paramMap.get("numTopicStores"), String.valueOf(1));
+ Assert.assertEquals(paramMap.get("numPartitions"), String.valueOf(2));
+ Assert.assertEquals(paramMap.get("unflushThreshold"), String.valueOf(3));
+ Assert.assertEquals(paramMap.get("unflushInterval"), String.valueOf(4));
+ Assert.assertEquals(paramMap.get("unflushDataHold"), String.valueOf(5));
+ Assert.assertEquals(paramMap.get("memCacheMsgSizeInMB"), String.valueOf(6));
+ Assert.assertEquals(paramMap.get("memCacheMsgCntInK"), String.valueOf(7));
+ Assert.assertEquals(paramMap.get("memCacheFlushIntvl"), String.valueOf(8));
+ Assert.assertEquals(paramMap.get("acceptPublish"), "true");
+ Assert.assertEquals(paramMap.get("acceptSubscribe"), "false");
+ Assert.assertEquals(paramMap.get("deletePolicy"), "delete,22h");
+ Assert.assertEquals(paramMap.get("dataStoreType"), String.valueOf(9));
+ Assert.assertEquals(paramMap.get("dataPath"), "aaa");
+ paramMap.clear();
+ // case 3
+ entity2.getConfigureInfo(paramMap, false);
+ Assert.assertEquals(paramMap.size(), 13);
+ Assert.assertEquals(paramMap.get("numStore"), String.valueOf(1));
+ Assert.assertEquals(paramMap.get("numPart"), String.valueOf(2));
+ Assert.assertEquals(paramMap.get("unfDskMsgCnt"), String.valueOf(3));
+ Assert.assertEquals(paramMap.get("unfDskInt"), String.valueOf(4));
+ Assert.assertEquals(paramMap.get("unfDskDataSz"), String.valueOf(5));
+ Assert.assertEquals(paramMap.get("cacheInMB"), String.valueOf(6));
+ Assert.assertEquals(paramMap.get("unfMemMsgCnt"), String.valueOf(7));
+ Assert.assertEquals(paramMap.get("unfMemInt"), String.valueOf(8));
+ Assert.assertEquals(paramMap.get("accPub"), "true");
+ Assert.assertEquals(paramMap.get("accSub"), "false");
+ Assert.assertEquals(paramMap.get("delPol"), "delete,22h");
+ Assert.assertEquals(paramMap.get("dStType"), String.valueOf(9));
+ Assert.assertEquals(paramMap.get("dPath"), "aaa");
+ paramMap.clear();
+ }
}