You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/04/16 10:00:25 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-596] Add
WebTopicConfHandler class implementation
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 3030686 [INLONG-596] Add WebTopicConfHandler class implementation
3030686 is described below
commit 30306863c72e9726f9eadb65c819722b22003550
Author: gosonzhang <go...@tencent.com>
AuthorDate: Thu Apr 15 14:54:53 2021 +0800
[INLONG-596] Add WebTopicConfHandler class implementation
---
.../tubemq/server/common/fielddef/WebFieldDef.java | 7 +-
.../server/common/statusdef/ManageStatus.java | 29 +-
.../server/master/metamanage/MetaDataManager.java | 512 +++++++++---
.../metastore/BdbMetaStoreServiceImpl.java | 143 +++-
.../metamanage/metastore/MetaStoreService.java | 36 +-
.../metastore/dao/entity/ClusterSettingEntity.java | 27 +-
.../metastore/dao/entity/TopicCtrlEntity.java | 69 +-
...eployConfEntity.java => TopicDeployEntity.java} | 107 ++-
...loyConfigMapper.java => TopicDeployMapper.java} | 26 +-
...pperImpl.java => BdbTopicDeployMapperImpl.java} | 172 ++--
.../master/web/handler/TopicProcessResult.java | 48 ++
.../web/handler/WebAdminFlowRuleHandler.java | 520 +++++++++++++
.../master/web/handler/WebBrokerConfHandler.java | 18 +-
.../master/web/handler/WebMasterInfoHandler.java | 8 +-
.../master/web/handler/WebTopicConfHandler.java | 862 +++++++++++++++++++++
15 files changed, 2298 insertions(+), 286 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 1ecca31..dafd8b1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -217,10 +217,9 @@ public enum WebFieldDef {
"Broker ip", TBaseConstants.META_MAX_BROKER_IP_LENGTH,
RegexDef.TMP_IPV4ADDRESS),
ISRESERVEDDATA(77, "isReservedData", "isRsvDt",
- WebFieldType.BOOLEAN, "Whether to keep topic data in the broker");
-
-
-
+ WebFieldType.BOOLEAN, "Whether to keep topic data in the broker"),
+ WITHCTRLINFO(78, "ctrlData", "cD",
+ WebFieldType.BOOLEAN, "With topic control data info.");
public final int id;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
index bfb9975..e1f1962 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
@@ -17,22 +17,31 @@
package org.apache.tubemq.server.common.statusdef;
+import org.apache.tubemq.corebase.utils.Tuple2;
+
+
public enum ManageStatus {
- STATUS_MANAGE_UNDEFINED(-2, "Undefined."),
- STATUS_MANAGE_APPLY(1, "Apply."),
- STATUS_MANAGE_ONLINE(5, "Online."),
- STATUS_MANAGE_ONLINE_NOT_WRITE(6, "Online with not write"),
- STATUS_MANAGE_ONLINE_NOT_READ(7, "Online with not read"),
- STATUS_MANAGE_OFFLINE(9, "Offline");
+ STATUS_MANAGE_UNDEFINED(-2, "-", false, false),
+ STATUS_MANAGE_APPLY(1, "draft", false, false),
+ STATUS_MANAGE_ONLINE(5, "online", true, true),
+ STATUS_MANAGE_ONLINE_NOT_WRITE(6, "only-read", false, true),
+ STATUS_MANAGE_ONLINE_NOT_READ(7, "only-write", true, false),
+ STATUS_MANAGE_OFFLINE(9, "offline", false, false);
private int code;
private String description;
+ private boolean isAcceptPublish;
+ private boolean isAcceptSubscribe;
- ManageStatus(int code, String description) {
+ ManageStatus(int code, String description,
+ boolean isAcceptPublish,
+ boolean isAcceptSubscribe) {
this.code = code;
this.description = description;
+ this.isAcceptPublish = isAcceptPublish;
+ this.isAcceptSubscribe = isAcceptSubscribe;
}
public boolean isOnlineStatus() {
@@ -53,6 +62,11 @@ public enum ManageStatus {
return description;
}
+ public Tuple2<Boolean, Boolean> getPubSubStatus() {
+ return new Tuple2<>(isAcceptPublish, isAcceptSubscribe);
+ }
+
+
public static ManageStatus valueOf(int code) {
for (ManageStatus status : ManageStatus.values()) {
if (status.getCode() == code) {
@@ -62,4 +76,5 @@ public enum ManageStatus {
throw new IllegalArgumentException(String.format(
"unknown broker manage status code %s", code));
}
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index d3e79dc..9cdd0d3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -55,12 +55,13 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlac
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
import org.apache.tubemq.server.master.nodemanage.nodebroker.TargetValidResult;
import org.apache.tubemq.server.master.web.handler.BrokerProcessResult;
import org.apache.tubemq.server.master.web.handler.GroupProcessResult;
+import org.apache.tubemq.server.master.web.handler.TopicProcessResult;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -643,13 +644,13 @@ public class MetaDataManager implements Server {
if (!metaStoreService.checkStoreStatus(true, result)) {
return result.isSuccess();
}
- Map<String, TopicDeployConfEntity> confEntityMap =
+ Map<String, TopicDeployEntity> confEntityMap =
metaStoreService.getConfiguredTopicInfo(brokerId);
if (confEntityMap == null || confEntityMap.isEmpty()) {
return result.isSuccess();
}
for (String topicName : rmvTopics) {
- TopicDeployConfEntity topicEntity = confEntityMap.get(topicName);
+ TopicDeployEntity topicEntity = confEntityMap.get(topicName);
if (topicEntity != null
&& topicEntity.getTopicStatus() == TopicStatus.STATUS_TOPIC_SOFT_REMOVE) {
confDelTopicConfInfo(topicEntity.getModifyUser(),
@@ -679,12 +680,12 @@ public class MetaDataManager implements Server {
if (!metaStoreService.checkStoreStatus(true, result)) {
return result.isSuccess();
}
- Map<String, TopicDeployConfEntity> confEntityMap =
+ Map<String, TopicDeployEntity> confEntityMap =
metaStoreService.getConfiguredTopicInfo(brokerId);
if (confEntityMap == null || confEntityMap.isEmpty()) {
return result.isSuccess();
}
- for (TopicDeployConfEntity topicEntity : confEntityMap.values()) {
+ for (TopicDeployEntity topicEntity : confEntityMap.values()) {
if (topicEntity == null) {
continue;
}
@@ -738,7 +739,7 @@ public class MetaDataManager implements Server {
return metaStoreService.getBrokerConfByBrokerIp(brokerIp);
}
- public Map<String, TopicDeployConfEntity> getBrokerTopicConfEntitySet(int brokerId) {
+ public Map<String, TopicDeployEntity> getBrokerTopicConfEntitySet(int brokerId) {
return metaStoreService.getConfiguredTopicInfo(brokerId);
}
@@ -882,115 +883,357 @@ public class MetaDataManager implements Server {
// ////////////////////////////////////////////////////////////////////////////
- /**
- * Get broker topic entity, if query entity is null, return all topic entity
- *
- * @param qryEntity query conditions
- * @return topic entity map
- */
- public Map<String, List<TopicDeployConfEntity>> getTopicConfEntityMap(
- TopicDeployConfEntity qryEntity) {
- return metaStoreService.getTopicConfMap(qryEntity);
+ public List<TopicProcessResult> addTopicDeployInfo(long dataVerId, String createUsr,
+ Date createDate, Set<Integer> brokerIdSet,
+ Set<String> topicNameSet,
+ TopicPropGroup topicPropInfo,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ TopicDeployEntity deployConf;
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ // add topic control info
+ addIfAbsentTopicCtrlConf(topicNameSet, createUsr, sBuilder, result);
+ result.clear();
+ // add topic deployment record
+ for (Integer brokerId : brokerIdSet) {
+ BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+ if (brokerConf == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, "", result));
+ continue;
+ }
+ for (String topicName : topicNameSet) {
+ TopicDeployEntity deployInfo = getTopicConfInfo(brokerId, topicName);
+ if (deployInfo != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ DataOpErrCode.DERR_EXISTED.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ deployConf = new TopicDeployEntity(dataVerId, createUsr, createDate);
+ deployConf.setTopicDeployInfo(brokerConf.getBrokerId(),
+ brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
+ deployConf.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
+ metaStoreService.addTopicConf(deployConf, sBuilder, result);
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ }
+ }
+ return retInfo;
}
- public Map<String, List<TopicDeployConfEntity>> getTopicConfMapByTopicAndBrokerIds(
- Set<String> topicNameSet, Set<Integer> brokerIdSet) {
- return metaStoreService.getTopicDepInfoByTopicBrokerId(topicNameSet, brokerIdSet);
+ public TopicProcessResult addTopicDeployInfo(TopicDeployEntity deployEntity,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ // add topic control info
+ addIfAbsentTopicCtrlConf(deployEntity.getTopicName(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ deployEntity.getCreateUser(), sBuilder, result);
+ BrokerConfEntity brokerConf = getBrokerConfByBrokerId(deployEntity.getBrokerId());
+ if (brokerConf == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ return new TopicProcessResult(deployEntity.getBrokerId(), "", result);
+ }
+ // add topic deployment record
+ TopicDeployEntity curDeployInfo =
+ metaStoreService.getTopicConfByeRecKey(deployEntity.getRecordKey());
+ if (curDeployInfo != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ DataOpErrCode.DERR_EXISTED.getDescription());
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
+ }
+ metaStoreService.addTopicConf(deployEntity, sBuilder, result);
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
}
/**
- * Add topic configure
+ * Modify topic config
*
- * @param entity the topic control info entity will be add
- * @param strBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
- public boolean confAddTopicConfig(TopicDeployConfEntity entity,
- StringBuilder strBuffer,
- ProcessResult result) {
- BrokerConfEntity brkEntity =
- metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId());
- if (brkEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_CONDITION_LACK.getCode(),
- "Miss broker configure, please create broker configure first!");
- return result.isSuccess();
+ public List<TopicProcessResult> modTopicConfig(long dataVerId, String modifyUser,
+ Date modifyDate, Set<Integer> brokerIdSet,
+ Set<String> topicNameSet,
+ TopicPropGroup topicProps,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ // add topic control info
+ addIfAbsentTopicCtrlConf(topicNameSet, modifyUser, sBuilder, result);
+ result.clear();
+ // add topic deployment record
+ TopicDeployEntity curEntity;
+ TopicDeployEntity newEntity;
+ for (Integer brokerId : brokerIdSet) {
+ BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+ if (brokerConf == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, "", result));
+ continue;
+ }
+ for (String topicName : topicNameSet) {
+ curEntity = getTopicConfInfo(brokerId, topicName);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ if (!curEntity.isValidTopicStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuilder.append("Topic of ").append(topicName)
+ .append("is deleted softly in brokerId=").append(brokerId)
+ .append(", please resume the record or hard removed first!")
+ .toString());
+ sBuilder.delete(0, sBuilder.length());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ if (topicProps != null) {
+ if (topicProps.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
+ && topicProps.getNumPartitions() < curEntity.getNumPartitions()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuilder.append("Partition value is less than before,")
+ .append("please confirm the configure first! brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName())
+ .append(", old Partition value is ")
+ .append(curEntity.getNumPartitions())
+ .append(", new Partition value is ")
+ .append(topicProps.getNumPartitions()).toString());
+ sBuilder.delete(0, sBuilder.length());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ if (topicProps.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
+ && topicProps.getNumTopicStores() < curEntity.getNumTopicStores()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuilder.append("TopicStores value is less than before,")
+ .append("please confirm the configure first! brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName())
+ .append(", old TopicStores value is ")
+ .append(curEntity.getNumTopicStores())
+ .append(", new TopicStores value is ")
+ .append(topicProps.getNumTopicStores()).toString());
+ sBuilder.delete(0, sBuilder.length());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ }
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(dataVerId,
+ null, null, modifyUser, modifyDate, null);
+ if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, null, null, topicProps)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ sBuilder.append("Data not changed for brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName()).toString());
+ sBuilder.delete(0, sBuilder.length());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ metaStoreService.updTopicConf(newEntity, sBuilder, result);
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ }
}
- if (metaStoreService.addTopicConf(entity, result)) {
- strBuffer.append("[confAddTopicConfig], ")
- .append(entity.getCreateUser())
- .append(" added topic configure record :")
- .append(entity.toString());
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[confAddTopicConfig], ")
- .append("failure to add topic configure record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
+ return retInfo;
+ }
+
+ /**
+ * Modify topic config
+ *
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public List<TopicProcessResult> modDelOrRmvTopicConf(long dataVerId, String modifyUser,
+ Date modifyDate, Set<Integer> brokerIdSet,
+ Set<String> topicNameSet,
+ TopicStatus topicStatus,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ TopicDeployEntity curEntity;
+ TopicDeployEntity newEntity;
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ // add topic deployment record
+ for (Integer brokerId : brokerIdSet) {
+ BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+ if (brokerConf == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, "", result));
+ continue;
+ }
+ for (String topicName : topicNameSet) {
+ curEntity = getTopicConfInfo(brokerId, topicName);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ if (curEntity.isAcceptPublish()
+ || curEntity.isAcceptSubscribe()) { // still accept publish and subscribe
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuilder.append("The topic ").append(topicName)
+ .append("'s acceptPublish and acceptSubscribe status must be false in broker=")
+ .append(brokerId).append(" before topic deleted!").toString());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ if ((topicStatus == TopicStatus.STATUS_TOPIC_SOFT_DELETE
+ && !curEntity.isValidTopicStatus())
+ || (topicStatus == TopicStatus.STATUS_TOPIC_SOFT_REMOVE
+ && curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE)) {
+ result.setSuccResult("");
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(dataVerId,
+ null, null, modifyUser, modifyDate, null);
+ if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, null, topicStatus, null)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ sBuilder.append("Data not changed for brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName()).toString());
+ sBuilder.delete(0, sBuilder.length());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ metaStoreService.updTopicConf(newEntity, sBuilder, result);
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ }
}
- strBuffer.delete(0, strBuffer.length());
- // add topic control record
- addIfAbsentTopicCtrlConf(entity.getTopicName(), entity.getTopicId(),
- entity.getCreateUser(), strBuffer, new ProcessResult());
- return result.isSuccess();
+ return retInfo;
}
/**
* Modify topic config
*
- * @param entity the topic control info entity will be update
- * @param strBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
- public boolean confModTopicConfig(TopicDeployConfEntity entity,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (metaStoreService.updTopicConf(entity, result)) {
- TopicDeployConfEntity oldEntity =
- (TopicDeployConfEntity) result.getRetData();
- TopicDeployConfEntity curEntity =
- metaStoreService.getTopicConfByeRecKey(entity.getRecordKey());
- strBuffer.append("[confModTopicConfig], ")
- .append(entity.getModifyUser())
- .append(" updated record from :")
- .append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[confModTopicConfig], ")
- .append("failure to update topic configure record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
+ public List<TopicProcessResult> modRedoDelTopicConf(long dataVerId, String modifyUser,
+ Date modifyDate, Set<Integer> brokerIdSet,
+ Set<String> topicNameSet,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ TopicDeployEntity curEntity;
+ TopicDeployEntity newEntity;
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ // add topic deployment record
+ for (Integer brokerId : brokerIdSet) {
+ BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+ if (brokerConf == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, "", result));
+ continue;
+ }
+ for (String topicName : topicNameSet) {
+ curEntity = getTopicConfInfo(brokerId, topicName);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ if (curEntity.isAcceptPublish()
+ || curEntity.isAcceptSubscribe()) { // still accept publish and subscribe
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuilder.append("The topic ").append(topicName)
+ .append("'s acceptPublish and acceptSubscribe status must be false in broker=")
+ .append(brokerId).append(" before topic deleted!").toString());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ if (curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE) {
+ if (curEntity.isValidTopicStatus()) {
+ result.setSuccResult(null);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuilder.append("Topic of ").append(topicName)
+ .append("is in removing flow in brokerId=")
+ .append(curEntity.getBrokerId())
+ .append(", please wait until remove process finished!")
+ .toString());
+ sBuilder.delete(0, sBuilder.length());
+ }
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(dataVerId,
+ null, null, modifyUser, modifyDate, null);
+ if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TopicStatus.STATUS_TOPIC_OK, null)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ sBuilder.append("Data not changed for brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName()).toString());
+ sBuilder.delete(0, sBuilder.length());
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ continue;
+ }
+ metaStoreService.updTopicConf(newEntity, sBuilder, result);
+ retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ }
}
- strBuffer.delete(0, strBuffer.length());
- // add topic control record
- addIfAbsentTopicCtrlConf(entity.getTopicName(), entity.getTopicId(),
- entity.getCreateUser(), strBuffer, new ProcessResult());
- return result.isSuccess();
+ return retInfo;
+ }
+
+
+ /**
+ * Get broker topic entity, if query entity is null, return all topic entity
+ *
+ * @param qryEntity query conditions
+ * @return topic entity map
+ */
+ public Map<String, List<TopicDeployEntity>> getTopicConfEntityMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet,
+ TopicDeployEntity qryEntity) {
+ return metaStoreService.getTopicConfMap(topicNameSet, brokerIdSet, qryEntity);
+ }
+
+ public TopicDeployEntity getTopicConfInfo(int brokerId, String topicName) {
+ return metaStoreService.getTopicConf(brokerId, topicName);
+ }
+
+ /**
+ * Get broker topic entity, if query entity is null, return all topic entity
+ *
+ * @return topic entity map
+ */
+ public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet) {
+ return metaStoreService.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+ }
+
+ public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+ return metaStoreService.getTopicDepInfoByTopicBrokerId(topicNameSet, brokerIdSet);
}
+
+
+
private boolean confDelTopicConfInfo(String operator,
String recordKey,
StringBuilder strBuffer,
ProcessResult result) {
- if (metaStoreService.delTopicConf(recordKey, result)) {
- GroupResCtrlEntity entity =
- (GroupResCtrlEntity) result.getRetData();
- if (entity != null) {
- strBuffer.append("[confDelTopicConfInfo], ").append(operator)
- .append(" deleted topic configure record :")
- .append(entity.toString());
- logger.info(strBuffer.toString());
- }
- } else {
- strBuffer.append("[confDelTopicConfInfo], ")
- .append("failure to delete topic configure record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
- return result.isSuccess();
+ return metaStoreService.delTopicConf(operator,
+ recordKey, strBuffer, result);
}
public List<String> getBrokerTopicStrConfigInfo(
@@ -1006,7 +1249,7 @@ public class MetaDataManager implements Server {
private List<String> inGetTopicConfStrInfo(BrokerConfEntity brokerEntity,
boolean isRemoved, StringBuilder sBuffer) {
List<String> topicConfStrs = new ArrayList<>();
- Map<String, TopicDeployConfEntity> topicEntityMap =
+ Map<String, TopicDeployEntity> topicEntityMap =
metaStoreService.getConfiguredTopicInfo(brokerEntity.getBrokerId());
if (topicEntityMap.isEmpty()) {
return topicConfStrs;
@@ -1015,7 +1258,7 @@ public class MetaDataManager implements Server {
ClusterSettingEntity clusterDefConf =
metaStoreService.getClusterConfig();
int defMsgSizeInB = clusterDefConf.getMaxMsgSizeInB();
- for (TopicDeployConfEntity topicEntity : topicEntityMap.values()) {
+ for (TopicDeployEntity topicEntity : topicEntityMap.values()) {
/*
* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:
* deletePolicy:filterStatusId:statusId
@@ -1114,19 +1357,7 @@ public class MetaDataManager implements Server {
public boolean confAddTopicCtrlConf(TopicCtrlEntity entity,
StringBuilder strBuffer,
ProcessResult result) {
- if (metaStoreService.addTopicCtrlConf(entity, result)) {
- strBuffer.append("[confAddTopicCtrlConf], ")
- .append(entity.getCreateUser())
- .append(" added topic control record :")
- .append(entity.toString());
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[confAddTopicCtrlConf], ")
- .append("failure to add topic control record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
+ metaStoreService.addTopicCtrlConf(entity, strBuffer, result);
return result.isSuccess();
}
@@ -1138,34 +1369,57 @@ public class MetaDataManager implements Server {
* @param operator operator
* @param strBuffer the print info string buffer
*/
- private void addIfAbsentTopicCtrlConf(String topicName,
- int topicNameId,
- String operator,
- StringBuilder strBuffer,
- ProcessResult result) {
+ public void addIfAbsentTopicCtrlConf(String topicName,
+ int topicNameId,
+ String operator,
+ StringBuilder strBuffer,
+ ProcessResult result) {
TopicCtrlEntity curEntity =
metaStoreService.getTopicCtrlConf(topicName);
if (curEntity != null) {
return;
}
- curEntity = new TopicCtrlEntity(topicName, topicNameId, operator);
- if (metaStoreService.addTopicCtrlConf(curEntity, result)) {
- strBuffer.append("[addIfAbsentTopicCtrlConf], ")
- .append(curEntity.getCreateUser())
- .append(" added topic control record :")
- .append(curEntity.toString());
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[addIfAbsentTopicCtrlConf], ")
- .append("failure to add topic control record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
+ int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+ ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
+ if (defSetting != null) {
+ maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
}
- strBuffer.delete(0, strBuffer.length());
+ curEntity = new TopicCtrlEntity(topicName, topicNameId, maxMsgSizeInMB, operator);
+ metaStoreService.addTopicCtrlConf(curEntity, strBuffer, result);
return;
}
/**
+ * Add if absent topic control configure info
+ *
+ * @param topicNameSet the topic name will be add
+ * @param operator the topic name id will be add
+ * @param operator operator
+ * @param strBuffer the print info string buffer
+ */
+ public void addIfAbsentTopicCtrlConf(Set<String> topicNameSet,
+ String operator,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ TopicCtrlEntity curEntity;
+ int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+ ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
+ if (defSetting != null) {
+ maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+ }
+ for (String topicName : topicNameSet) {
+ result.clear();
+ curEntity = metaStoreService.getTopicCtrlConf(topicName);
+ if (curEntity != null) {
+ continue;
+ }
+ curEntity = new TopicCtrlEntity(topicName,
+ TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB, operator);
+ metaStoreService.addTopicCtrlConf(curEntity, strBuffer, result);
+ }
+ }
+
+ /**
* Update topic control configure
*
* @param entity the topic control info entity will be update
@@ -1231,6 +1485,20 @@ public class MetaDataManager implements Server {
return this.metaStoreService.getTopicCtrlConf(topicName);
}
+ public int getTopicMaxMsgSizeInMB(String topicName) {
+ // get maxMsgSizeInMB info
+ int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+ ClusterSettingEntity clusterSettingEntity = getClusterDefSetting();
+ if (clusterSettingEntity != null) {
+ maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
+ }
+ TopicCtrlEntity topicCtrlEntity = getTopicCtrlByTopicName(topicName);
+ if (topicCtrlEntity != null) {
+ maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+ }
+ return maxMsgSizeInMB;
+ }
+
/**
* Get topic control entity list
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 0878e30..2aa4a57 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -68,21 +68,21 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlac
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.ClusterConfigMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupBlackListMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupResCtrlMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicCtrlMapper;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployConfigMapper;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbBrokerConfigMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbClusterConfigMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupBlackListMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupConsumeCtrlMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupResCtrlMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbTopicCtrlMapperImpl;
-import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbTopicDeployConfigMapperImpl;
+import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbTopicDeployMapperImpl;
import org.apache.tubemq.server.master.utils.BdbStoreSamplePrint;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -143,7 +143,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
// broker configure
private BrokerConfigMapper brokerConfigMapper;
// topic configure
- private TopicDeployConfigMapper topicDeployConfigMapper;
+ private TopicDeployMapper topicDeployMapper;
// topic control configure
private TopicCtrlMapper topicCtrlMapper;
// group configure
@@ -205,7 +205,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
logger.info("[BDB Impl] Stopping StoreManagerService...");
// close bdb configure
brokerConfigMapper.close();
- topicDeployConfigMapper.close();
+ topicDeployMapper.close();
groupResCtrlMapper.close();
topicCtrlMapper.close();
groupBlackListMapper.close();
@@ -413,30 +413,84 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
// topic configure api
@Override
- public boolean addTopicConf(TopicDeployConfEntity entity, ProcessResult result) {
+ public boolean addTopicConf(TopicDeployEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return topicDeployConfigMapper.addTopicConf(entity, result);
+ if (topicDeployMapper.addTopicConf(entity, result)) {
+ strBuffer.append("[addTopicConf], ")
+ .append(entity.getCreateUser())
+ .append(" added topic configure record :")
+ .append(entity.toString());
+ logger.info(strBuffer.toString());
+ } else {
+ strBuffer.append("[addTopicConf], ")
+ .append("failure to add topic configure record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
}
@Override
- public boolean updTopicConf(TopicDeployConfEntity entity, ProcessResult result) {
+ public boolean updTopicConf(TopicDeployEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return topicDeployConfigMapper.updTopicConf(entity, result);
+ if (topicDeployMapper.updTopicConf(entity, result)) {
+ TopicDeployEntity oldEntity =
+ (TopicDeployEntity) result.getRetData();
+ TopicDeployEntity curEntity =
+ topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+ strBuffer.append("[updTopicConf], ")
+ .append(entity.getModifyUser())
+ .append(" updated record from :")
+ .append(oldEntity.toString())
+ .append(" to ").append(curEntity.toString());
+ logger.info(strBuffer.toString());
+ } else {
+ strBuffer.append("[updTopicConf], ")
+ .append("failure to update topic configure record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
}
@Override
- public boolean delTopicConf(String recordKey, ProcessResult result) {
+ public boolean delTopicConf(String operator,
+ String recordKey,
+ StringBuilder strBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return topicDeployConfigMapper.delTopicConf(recordKey, result);
+ if (topicDeployMapper.delTopicConf(recordKey, result)) {
+ GroupResCtrlEntity entity =
+ (GroupResCtrlEntity) result.getRetData();
+ if (entity != null) {
+ strBuffer.append("[delTopicConf], ").append(operator)
+ .append(" deleted topic configure record :")
+ .append(entity.toString());
+ logger.info(strBuffer.toString());
+ }
+ } else {
+ strBuffer.append("[delTopicConf], ")
+ .append("failure to delete topic configure record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
}
@Override
@@ -448,7 +502,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- if (topicDeployConfigMapper.delTopicConfByBrokerId(brokerId, result)) {
+ if (topicDeployMapper.delTopicConfByBrokerId(brokerId, result)) {
strBuffer.append("[delTopicConfByBrokerId], ")
.append(operator)
.append(" deleted topic deploy record :")
@@ -466,59 +520,86 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
@Override
public boolean hasConfiguredTopics(int brokerId) {
- return topicDeployConfigMapper.hasConfiguredTopics(brokerId);
+ return topicDeployMapper.hasConfiguredTopics(brokerId);
+ }
+
+ @Override
+ public TopicDeployEntity getTopicConfByeRecKey(String recordKey) {
+ return topicDeployMapper.getTopicConfByeRecKey(recordKey);
}
@Override
- public TopicDeployConfEntity getTopicConfByeRecKey(String recordKey) {
- return topicDeployConfigMapper.getTopicConfByeRecKey(recordKey);
+ public List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity) {
+ return topicDeployMapper.getTopicConf(qryEntity);
}
@Override
- public List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity) {
- return topicDeployConfigMapper.getTopicConf(qryEntity);
+ public TopicDeployEntity getTopicConf(int brokerId, String topicName) {
+ return topicDeployMapper.getTopicConf(brokerId, topicName);
}
@Override
public Map<Integer, Set<String>> getConfiguredTopicInfo(Set<Integer> brokerIdSet) {
- return topicDeployConfigMapper.getConfiguredTopicInfo(brokerIdSet);
+ return topicDeployMapper.getConfiguredTopicInfo(brokerIdSet);
}
@Override
public Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet) {
- return topicDeployConfigMapper.getTopicBrokerInfo(topicNameSet);
+ return topicDeployMapper.getTopicBrokerInfo(topicNameSet);
}
@Override
public Set<String> getConfiguredTopicSet() {
- return topicDeployConfigMapper.getConfiguredTopicSet();
+ return topicDeployMapper.getConfiguredTopicSet();
}
@Override
- public Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMap(
- TopicDeployConfEntity qryEntity) {
- return topicDeployConfigMapper.getTopicConfMap(qryEntity);
+ public Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMap(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet, TopicDeployEntity qryEntity) {
+ return topicDeployMapper.getTopicConfMap(topicNameSet, brokerIdSet, qryEntity);
}
@Override
- public Map<String, List<TopicDeployConfEntity>>getTopicDepInfoByTopicBrokerId(
+ public Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+ return topicDeployMapper.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+ }
+
+
+ @Override
+ public Map<String, List<TopicDeployEntity>>getTopicDepInfoByTopicBrokerId(
Set<String> topicSet, Set<Integer> brokerIdSet) {
- return topicDeployConfigMapper.getTopicConfMapByTopicAndBrokerIds(topicSet, brokerIdSet);
+ return topicDeployMapper.getTopicConfMapByTopicAndBrokerIds(topicSet, brokerIdSet);
}
@Override
- public Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId) {
- return topicDeployConfigMapper.getConfiguredTopicInfo(brokerId);
+ public Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId) {
+ return topicDeployMapper.getConfiguredTopicInfo(brokerId);
}
// topic control api
@Override
- public boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result) {
+ public boolean addTopicCtrlConf(TopicCtrlEntity entity,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return topicCtrlMapper.addTopicCtrlConf(entity, result);
+ if (topicCtrlMapper.addTopicCtrlConf(entity, result)) {
+ sBuffer.append("[addTopicCtrlConf], ")
+ .append(entity.getCreateUser())
+ .append(" added topic control record :")
+ .append(entity.toString());
+ logger.info(sBuffer.toString());
+ } else {
+ sBuffer.append("[addTopicCtrlConf], ")
+ .append("failure to add topic control record : ")
+ .append(result.getErrInfo());
+ logger.warn(sBuffer.toString());
+ }
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
@Override
@@ -1270,7 +1351,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
private void initMetaStore() {
clusterConfigMapper = new BdbClusterConfigMapperImpl(repEnv, storeConfig);
brokerConfigMapper = new BdbBrokerConfigMapperImpl(repEnv, storeConfig);
- topicDeployConfigMapper = new BdbTopicDeployConfigMapperImpl(repEnv, storeConfig);
+ topicDeployMapper = new BdbTopicDeployMapperImpl(repEnv, storeConfig);
groupResCtrlMapper = new BdbGroupResCtrlMapperImpl(repEnv, storeConfig);
topicCtrlMapper = new BdbTopicCtrlMapperImpl(repEnv, storeConfig);
groupConsumeCtrlMapper = new BdbGroupConsumeCtrlMapperImpl(repEnv, storeConfig);
@@ -1282,7 +1363,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
clearCachedRunData();
clusterConfigMapper.loadConfig();
brokerConfigMapper.loadConfig();
- topicDeployConfigMapper.loadConfig();
+ topicDeployMapper.loadConfig();
topicCtrlMapper.loadConfig();
groupResCtrlMapper.loadConfig();
groupBlackListMapper.loadConfig();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index b765543..362baf1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -30,7 +30,7 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlac
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
public interface MetaStoreService extends KeepAlive, Server {
@@ -129,11 +129,18 @@ public interface MetaStoreService extends KeepAlive, Server {
BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp);
// topic configure api
- boolean addTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+ boolean addTopicConf(TopicDeployEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result);
- boolean updTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+ boolean updTopicConf(TopicDeployEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result);
- boolean delTopicConf(String recordKey, ProcessResult result);
+ boolean delTopicConf(String operator,
+ String recordKey,
+ StringBuilder strBuffer,
+ ProcessResult result);
boolean delTopicConfByBrokerId(String operator,
int brokerId,
@@ -142,9 +149,11 @@ public interface MetaStoreService extends KeepAlive, Server {
boolean hasConfiguredTopics(int brokerId);
- TopicDeployConfEntity getTopicConfByeRecKey(String recordKey);
+ TopicDeployEntity getTopicConfByeRecKey(String recordKey);
- List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity);
+ List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity);
+
+ TopicDeployEntity getTopicConf(int brokerId, String topicName);
Set<String> getConfiguredTopicSet();
@@ -152,16 +161,21 @@ public interface MetaStoreService extends KeepAlive, Server {
Map<String/* topicName */, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet);
- Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMap(
- TopicDeployConfEntity qryEntity);
+ Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMap(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet, TopicDeployEntity qryEntity);
+
+ Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet);
- Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicDepInfoByTopicBrokerId(
+ Map<String/* topicName */, List<TopicDeployEntity>> getTopicDepInfoByTopicBrokerId(
Set<String> topicSet, Set<Integer> brokerIdSet);
- Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId);
+ Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId);
// topic control api
- boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+ boolean addTopicCtrlConf(TopicCtrlEntity entity,
+ StringBuilder sBuffer,
+ ProcessResult result);
boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index d861511..7beb57e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -45,6 +45,7 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED;
private TopicPropGroup clsDefTopicProps = new TopicPropGroup();
private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
+ private int maxMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED;
private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
private EnableStatus gloFlowCtrlStatus = EnableStatus.STATUS_UNDEFINE;
// flow control rule count
@@ -77,6 +78,8 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
bdbEntity.getDeletePolicy(), bdbEntity.getDefDataType(),
bdbEntity.getDefDataPath());
this.maxMsgSizeInB = bdbEntity.getMaxMsgSizeInB();
+ this.maxMsgSizeInMB =
+ this.maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE;
this.qryPriorityId = bdbEntity.getQryPriorityId();
setEnableFlowCtrl(bdbEntity.getEnableGloFlowCtrl());
setGloFlowCtrlInfo(bdbEntity.getGloFlowCtrlCnt(), bdbEntity.getGloFlowCtrlInfo());
@@ -112,9 +115,9 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
this.brokerPort = TBaseConstants.META_DEFAULT_BROKER_PORT;
this.brokerTLSPort = TBaseConstants.META_DEFAULT_BROKER_TLS_PORT;
this.brokerWebPort = TBaseConstants.META_DEFAULT_BROKER_WEB_PORT;
+ this.maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
this.maxMsgSizeInB =
- SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
+ SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(this.maxMsgSizeInMB);
this.qryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE;
this.gloFlowCtrlStatus = EnableStatus.STATUS_DISABLE;
this.gloFlowCtrlRuleCnt = 0;
@@ -151,10 +154,12 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
}
// check and set modified field
if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
- newMaxMsgSizeMB = SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(newMaxMsgSizeMB);
- if (this.maxMsgSizeInB != newMaxMsgSizeMB) {
+ int newMaxMsgSizeB =
+ SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(newMaxMsgSizeMB);
+ if (this.maxMsgSizeInB != newMaxMsgSizeB) {
changed = true;
- this.maxMsgSizeInB = newMaxMsgSizeMB;
+ this.maxMsgSizeInB = newMaxMsgSizeB;
+ this.maxMsgSizeInMB = newMaxMsgSizeMB;
}
}
// check and set qry priority id
@@ -206,6 +211,10 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
return maxMsgSizeInB;
}
+ public int getMaxMsgSizeInMB() {
+ return maxMsgSizeInMB;
+ }
+
public int getQryPriorityId() {
return qryPriorityId;
}
@@ -260,15 +269,11 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
public StringBuilder toWebJsonStr(StringBuilder sBuilder,
boolean isLongName,
boolean fullFormat) {
- int tmpMsgSizeInMB = maxMsgSizeInB;
- if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
- tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
- }
if (isLongName) {
sBuilder.append("{\"brokerPort\":").append(brokerPort)
.append(",\"brokerTLSPort\":").append(brokerTLSPort)
.append(",\"brokerWebPort\":").append(brokerWebPort)
- .append(",\"maxMsgSizeInMB\":").append(tmpMsgSizeInMB)
+ .append(",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
.append(",\"qryPriorityId\":").append(qryPriorityId)
.append(",\"flowCtrlEnable\":").append(gloFlowCtrlStatus.isEnable())
.append(",\"flowCtrlRuleCount\":").append(gloFlowCtrlRuleCnt)
@@ -277,7 +282,7 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
sBuilder.append("{\"bPort\":").append(brokerPort)
.append(",\"bTlsPort\":").append(brokerTLSPort)
.append(",\"bWebPort\":").append(brokerWebPort)
- .append(",\"mxMsgInMB\":").append(tmpMsgSizeInMB)
+ .append(",\"mxMsgInMB\":").append(maxMsgSizeInMB)
.append(",\"qryPriId\":").append(qryPriorityId)
.append(",\"fCtrlEn\":").append(gloFlowCtrlStatus.isEnable())
.append(",\"fCtrlCnt\":").append(gloFlowCtrlRuleCnt)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index 0bd8f9e..c4b80e4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -20,6 +20,7 @@ package org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
import java.util.Date;
import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.SettingValidUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
@@ -36,17 +37,22 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
private int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
private EnableStatus authCtrlStatus = EnableStatus.STATUS_UNDEFINE;
private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
+ private int maxMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED;
public TopicCtrlEntity() {
super();
}
- public TopicCtrlEntity(String topicName, int topicNameId, String createUser) {
+ public TopicCtrlEntity(String topicName, int topicNameId,
+ int maxMsgSizeInMB, String createUser) {
super(createUser, new Date());
this.topicName = topicName;
this.topicNameId = topicNameId;
this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
+ this.maxMsgSizeInB =
+ SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
+ this.maxMsgSizeInMB = maxMsgSizeInMB;
}
public TopicCtrlEntity(String topicName, int topicNameId,
@@ -56,7 +62,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
this.topicName = topicName;
this.topicNameId = topicNameId;
- this.maxMsgSizeInB = maxMsgSizeInB;
+ this.fillMaxMsgSize(maxMsgSizeInB);
if (enableAuth) {
this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
} else {
@@ -69,7 +75,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
this.topicName = bdbEntity.getTopicName();
this.topicNameId = bdbEntity.getTopicId();
- this.maxMsgSizeInB = bdbEntity.getMaxMsgSize();
+ this.fillMaxMsgSize(maxMsgSizeInB);
if (bdbEntity.isEnableAuthControl()) {
this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
} else {
@@ -120,8 +126,8 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
return maxMsgSizeInB;
}
- public void setMaxMsgSizeInB(int maxMsgSizeInB) {
- this.maxMsgSizeInB = maxMsgSizeInB;
+ public int getMaxMsgSizeInMB() {
+ return maxMsgSizeInMB;
}
public int getTopicId() {
@@ -133,6 +139,45 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
}
/**
+ * update subclass field values
+ *
+ * @return if changed
+ */
+ public boolean updModifyInfo(int topicNameId,
+ int newMaxMsgSizeMB,
+ EnableStatus authCtrlStatus) {
+ boolean changed = false;
+ // check and set topicNameId info
+ if (topicNameId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.topicNameId != topicNameId) {
+ changed = true;
+ this.topicNameId = topicNameId;
+ }
+ // check and set modified field
+ if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
+ int newMaxMsgSizeB =
+ SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(newMaxMsgSizeMB);
+ if (this.maxMsgSizeInB != newMaxMsgSizeB) {
+ changed = true;
+ this.maxMsgSizeInB = newMaxMsgSizeB;
+ this.maxMsgSizeInMB = newMaxMsgSizeMB;
+ }
+ }
+ // check and set authCtrlStatus info
+ if (authCtrlStatus != null
+ && authCtrlStatus != EnableStatus.STATUS_UNDEFINE
+ && this.authCtrlStatus != authCtrlStatus) {
+ changed = true;
+ this.authCtrlStatus = authCtrlStatus;
+ }
+
+ if (changed) {
+ updSerialId();
+ }
+ return changed;
+ }
+
+ /**
* Check whether the specified query item value matches
* Allowed query items:
* topicName, maxMsgSizeInB, authCtrlStatus
@@ -169,20 +214,16 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
public StringBuilder toWebJsonStr(StringBuilder sBuilder,
boolean isLongName,
boolean fullFormat) {
- int tmpMsgSizeInMB = maxMsgSizeInB;
- if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
- tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
- }
if (isLongName) {
sBuilder.append("{\"topicName\":\"").append(topicName).append("\"")
.append(",\"topicNameId\":").append(topicNameId)
.append(",\"enableAuthControl\":").append(authCtrlStatus.isEnable())
- .append(",\"maxMsgSizeInMB\":").append(tmpMsgSizeInMB);
+ .append(",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB);
} else {
sBuilder.append("{\"topic\":\"").append(topicName).append("\"")
.append(",\"topicId\":").append(topicNameId)
.append(",\"acEn\":").append(authCtrlStatus.isEnable())
- .append(",\"mxMsgInMB\":").append(tmpMsgSizeInMB);
+ .append(",\"mxMsgInMB\":").append(maxMsgSizeInMB);
}
super.toWebJsonStr(sBuilder, isLongName);
if (fullFormat) {
@@ -191,6 +232,12 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
return sBuilder;
}
+ private void fillMaxMsgSize(int maxMsgSizeInB) {
+ this.maxMsgSizeInB = maxMsgSizeInB;
+ this.maxMsgSizeInMB =
+ maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE;
+ }
+
/**
* check if subclass fields is equals
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
similarity index 75%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 2802d35..e502854 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -30,7 +30,7 @@ import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
* store the topic configure setting
*
*/
-public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
+public class TopicDeployEntity extends BaseEntity implements Cloneable {
private String recordKey = "";
private String topicName = "";
@@ -44,28 +44,34 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
private TopicPropGroup topicProps = new TopicPropGroup();
- public TopicDeployConfEntity() {
+ public TopicDeployEntity() {
super();
}
- public TopicDeployConfEntity(String topicName, int topicId, int brokerId,
- String brokerIp, int brokerPort,
- TopicPropGroup topicProps, TopicStatus deployStatus,
- long dataVersionId, String createUser,
- Date createDate, String modifyUser, Date modifyDate) {
+ public TopicDeployEntity(long dataVerId, String createUser, Date createDate) {
+ super(dataVerId, createUser, createDate);
+ }
+
+ public TopicDeployEntity(String topicName, int topicId, int brokerId,
+ String brokerIp, int brokerPort,
+ TopicPropGroup topicProps, TopicStatus deployStatus,
+ long dataVersionId, String createUser,
+ Date createDate, String modifyUser, Date modifyDate) {
super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
- setTopicDeployInfo(brokerId, brokerIp, brokerPort, topicName, topicId);
+ setTopicDeployInfo(brokerId, brokerIp, brokerPort, topicName);
+ this.topicNameId = topicId;
this.deployStatus = deployStatus;
this.topicProps = topicProps;
}
- public TopicDeployConfEntity(BdbTopicConfEntity bdbEntity) {
+ public TopicDeployEntity(BdbTopicConfEntity bdbEntity) {
super(bdbEntity.getDataVerId(),
bdbEntity.getCreateUser(), bdbEntity.getCreateDate(),
bdbEntity.getModifyUser(), bdbEntity.getModifyDate());
setTopicDeployInfo(bdbEntity.getBrokerId(),
bdbEntity.getBrokerIp(), bdbEntity.getBrokerPort(),
- bdbEntity.getTopicName(), bdbEntity.getTopicId());
+ bdbEntity.getTopicName());
+ this.topicNameId = bdbEntity.getTopicId();
this.deployStatus = TopicStatus.valueOf(bdbEntity.getTopicStatusId());
this.topicProps =
new TopicPropGroup(bdbEntity.getNumTopicStores(), bdbEntity.getNumPartitions(),
@@ -97,17 +103,17 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
return bdbEntity;
}
- public void setTopicDeployInfo(int brokerId, String brokerIp, int brokerPort,
- String topicName, int topicId) {
+ public void setTopicDeployInfo(int brokerId, String brokerIp,
+ int brokerPort, String topicName) {
this.brokerId = brokerId;
this.brokerIp = brokerIp;
this.brokerPort = brokerPort;
this.topicName = topicName;
- this.topicNameId = topicId;
this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
this.brokerAddress = KeyBuilderUtils.buildAddressInfo(brokerIp, brokerPort);
}
+
public String getRecordKey() {
return recordKey;
}
@@ -136,6 +142,22 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
this.topicProps = topicProps;
}
+ public int getNumTopicStores() {
+ return this.topicProps.getNumTopicStores();
+ }
+
+ public int getNumPartitions() {
+ return this.topicProps.getNumPartitions();
+ }
+
+ public boolean isAcceptPublish() {
+ return this.topicProps.isAcceptPublish();
+ }
+
+ public boolean isAcceptSubscribe() {
+ return this.topicProps.isAcceptSubscribe();
+ }
+
public int getTopicId() {
return topicNameId;
}
@@ -178,12 +200,59 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
}
/**
+ * update subclass field values
+ *
+ * @return if changed
+ */
+ public boolean updModifyInfo(int topicNameId, int brokerPort, String brokerIp,
+ TopicStatus deployStatus, TopicPropGroup topicProps) {
+ boolean changed = false;
+ // check and set topicNameId info
+ if (topicNameId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.topicNameId != topicNameId) {
+ changed = true;
+ this.topicNameId = topicNameId;
+ }
+ // check and set brokerPort info
+ if (brokerPort != TBaseConstants.META_VALUE_UNDEFINED
+ && this.brokerPort != brokerPort) {
+ changed = true;
+ this.brokerPort = brokerPort;
+ }
+ // check and set filterCondStr info
+ if (TStringUtils.isNotBlank(brokerIp)
+ && !this.brokerIp.equals(brokerIp)) {
+ changed = true;
+ this.brokerIp = brokerIp;
+ }
+ // check and set deployStatus info
+ if (deployStatus != null
+ && deployStatus != TopicStatus.STATUS_TOPIC_UNDEFINED
+ && this.deployStatus != deployStatus) {
+ changed = true;
+ this.deployStatus = deployStatus;
+ }
+ // check and set topicProps info
+ if (topicProps != null
+ && !topicProps.isDataEquals(this.topicProps)) {
+ changed = true;
+ this.topicProps = topicProps;
+ }
+ if (changed) {
+ updSerialId();
+ this.brokerAddress =
+ KeyBuilderUtils.buildAddressInfo(this.brokerIp, this.brokerPort);
+ }
+ return changed;
+ }
+
+ /**
* Check whether the specified query item value matches
* Allowed query items:
* brokerId, topicId, topicName, topicStatus
* @return true: matched, false: not match
*/
- public boolean isMatched(TopicDeployConfEntity target) {
+ public boolean isMatched(TopicDeployEntity target) {
if (target == null) {
return true;
}
@@ -246,7 +315,7 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
* @param other check object
* @return if equals
*/
- public boolean isDataEquals(TopicDeployConfEntity other) {
+ public boolean isDataEquals(TopicDeployEntity other) {
return brokerId == other.brokerId
&& brokerPort == other.brokerPort
&& topicNameId == other.topicNameId
@@ -263,13 +332,13 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
if (this == o) {
return true;
}
- if (!(o instanceof TopicDeployConfEntity)) {
+ if (!(o instanceof TopicDeployEntity)) {
return false;
}
if (!super.equals(o)) {
return false;
}
- TopicDeployConfEntity that = (TopicDeployConfEntity) o;
+ TopicDeployEntity that = (TopicDeployEntity) o;
return isDataEquals(that);
}
@@ -280,9 +349,9 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
}
@Override
- public TopicDeployConfEntity clone() {
+ public TopicDeployEntity clone() {
try {
- TopicDeployConfEntity copy = (TopicDeployConfEntity) super.clone();
+ TopicDeployEntity copy = (TopicDeployEntity) super.clone();
if (copy.getTopicProps() != null) {
copy.setTopicProps(getTopicProps().clone());
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
similarity index 58%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 832ff2f..b4a1233 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -22,15 +22,15 @@ import java.util.Map;
import java.util.Set;
import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
-public interface TopicDeployConfigMapper extends AbstractMapper {
+public interface TopicDeployMapper extends AbstractMapper {
- boolean addTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+ boolean addTopicConf(TopicDeployEntity entity, ProcessResult result);
- boolean updTopicConf(TopicDeployConfEntity entity, ProcessResult result);
+ boolean updTopicConf(TopicDeployEntity entity, ProcessResult result);
boolean delTopicConf(String recordKey, ProcessResult result);
@@ -39,14 +39,20 @@ public interface TopicDeployConfigMapper extends AbstractMapper {
boolean hasConfiguredTopics(int brokerId);
- List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity);
+ List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity);
- TopicDeployConfEntity getTopicConfByeRecKey(String recordKey);
+ TopicDeployEntity getTopicConf(int brokerId, String topicName);
- Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMap(
- TopicDeployConfEntity qryEntity);
+ TopicDeployEntity getTopicConfByeRecKey(String recordKey);
- Map<String/* topicName */, List<TopicDeployConfEntity>> getTopicConfMapByTopicAndBrokerIds(
+ Map<String, List<TopicDeployEntity>> getTopicConfMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet,
+ TopicDeployEntity qryEntity);
+
+ Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet);
+
+ Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
Set<String> topicSet, Set<Integer> brokerIdSet);
Map<Integer/* brokerId */, Set<String>> getConfiguredTopicInfo(Set<Integer> brokerIdSet);
@@ -55,5 +61,5 @@ public interface TopicDeployConfigMapper extends AbstractMapper {
Set<String> getConfiguredTopicSet();
- Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId);
+ Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
similarity index 72%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index 5b9dcae..5f6c6c8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -32,27 +32,28 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
+import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployConfigMapper;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
+public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
private static final Logger logger =
- LoggerFactory.getLogger(BdbTopicDeployConfigMapperImpl.class);
+ LoggerFactory.getLogger(BdbTopicDeployMapperImpl.class);
// Topic configure store
private EntityStore topicConfStore;
private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity> topicConfIndex;
// data cache
- private ConcurrentHashMap<String/* recordKey */, TopicDeployConfEntity> topicConfCache =
+ private ConcurrentHashMap<String/* recordKey */, TopicDeployEntity> topicConfCache =
new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
brokerIdCacheIndex = new ConcurrentHashMap<>();
@@ -65,7 +66,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
- public BdbTopicDeployConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
+ public BdbTopicDeployMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
topicConfStore = new EntityStore(repEnv,
TBDBStoreTables.BDB_TOPIC_CONFIG_STORE_NAME, storeConfig);
topicConfIndex =
@@ -96,7 +97,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
logger.warn("[BDB Impl] found Null data while loading topic configure!");
continue;
}
- TopicDeployConfEntity memEntity = new TopicDeployConfEntity(bdbEntity);
+ TopicDeployEntity memEntity = new TopicDeployEntity(bdbEntity);
addOrUpdCacheRecord(memEntity);
count++;
}
@@ -113,8 +114,8 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
@Override
- public boolean addTopicConf(TopicDeployConfEntity memEntity, ProcessResult result) {
- TopicDeployConfEntity curEntity =
+ public boolean addTopicConf(TopicDeployEntity memEntity, ProcessResult result) {
+ TopicDeployEntity curEntity =
topicConfCache.get(memEntity.getRecordKey());
if (curEntity != null) {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
@@ -131,8 +132,8 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
@Override
- public boolean updTopicConf(TopicDeployConfEntity memEntity, ProcessResult result) {
- TopicDeployConfEntity curEntity =
+ public boolean updTopicConf(TopicDeployEntity memEntity, ProcessResult result) {
+ TopicDeployEntity curEntity =
topicConfCache.get(memEntity.getRecordKey());
if (curEntity == null) {
result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
@@ -159,7 +160,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
@Override
public boolean delTopicConf(String recordKey, ProcessResult result) {
- TopicDeployConfEntity curEntity =
+ TopicDeployEntity curEntity =
topicConfCache.get(recordKey);
if (curEntity == null) {
result.setSuccResult(null);
@@ -195,17 +196,17 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
@Override
- public TopicDeployConfEntity getTopicConfByeRecKey(String recordKey) {
+ public TopicDeployEntity getTopicConfByeRecKey(String recordKey) {
return topicConfCache.get(recordKey);
}
@Override
- public List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity) {
- List<TopicDeployConfEntity> retEntitys = new ArrayList<>();
+ public List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity) {
+ List<TopicDeployEntity> retEntitys = new ArrayList<>();
if (qryEntity == null) {
retEntitys.addAll(topicConfCache.values());
} else {
- for (TopicDeployConfEntity entity : topicConfCache.values()) {
+ for (TopicDeployEntity entity : topicConfCache.values()) {
if (entity.isMatched(qryEntity)) {
retEntitys.add(entity);
}
@@ -215,44 +216,121 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
@Override
- public Map<String, List<TopicDeployConfEntity>> getTopicConfMap(TopicDeployConfEntity qryEntity) {
- List<TopicDeployConfEntity> items;
- Map<String, List<TopicDeployConfEntity>> retEntityMap = new HashMap<>();
- if (qryEntity == null) {
- for (TopicDeployConfEntity entity : topicConfCache.values()) {
- items = retEntityMap.get(entity.getTopicName());
- if (items == null) {
- items = new ArrayList<>();
- retEntityMap.put(entity.getTopicName(), items);
+ public TopicDeployEntity getTopicConf(int brokerId, String topicName) {
+ String recordKey =
+ KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
+ return topicConfCache.get(recordKey);
+ }
+
+ @Override
+ public Map<String, List<TopicDeployEntity>> getTopicConfMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet,
+ TopicDeployEntity qryEntity) {
+ List<TopicDeployEntity> items;
+ Set<String> qryTopicKey = null;
+ ConcurrentHashSet<String> keySet;
+ Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
+ if (topicNameSet != null && !topicNameSet.isEmpty()) {
+ qryTopicKey = new HashSet<>();
+ for (String topicName : topicNameSet) {
+ keySet = topicNameCacheIndex.get(topicName);
+ if (keySet != null && !keySet.isEmpty()) {
+ qryTopicKey.addAll(keySet);
}
- items.add(entity);
}
- } else {
- for (TopicDeployConfEntity entity : topicConfCache.values()) {
- if (entity.isMatched(qryEntity)) {
- items = retEntityMap.get(entity.getTopicName());
- if (items == null) {
- items = new ArrayList<>();
- retEntityMap.put(entity.getTopicName(), items);
- }
- items.add(entity);
+ }
+ if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+ if (qryTopicKey == null) {
+ qryTopicKey = new HashSet<>();
+ }
+ for (Integer brokerId : brokerIdSet) {
+ keySet = brokerIdCacheIndex.get(brokerId);
+ if (keySet != null && !keySet.isEmpty()) {
+ qryTopicKey.addAll(keySet);
}
}
}
+ if (qryTopicKey == null) {
+ qryTopicKey = new HashSet<>(topicConfCache.keySet());
+ }
+ if (qryTopicKey.isEmpty()) {
+ return retEntityMap;
+ }
+ for (String recordKey: qryTopicKey) {
+ TopicDeployEntity entity = topicConfCache.get(recordKey);
+ if (entity == null
+ || (qryEntity != null && !qryEntity.isMatched(entity))) {
+ continue;
+ }
+ items = retEntityMap.get(entity.getTopicName());
+ if (items == null) {
+ items = new ArrayList<>();
+ retEntityMap.put(entity.getTopicName(), items);
+ }
+ items.add(entity);
+ }
+ return retEntityMap;
+ }
+
+ @Override
+ public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+ List<TopicDeployEntity> items;
+ Set<String> qryTopicKey = null;
+ ConcurrentHashSet<String> keySet;
+ Map<Integer, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
+ if (topicNameSet != null && !topicNameSet.isEmpty()) {
+ qryTopicKey = new HashSet<>();
+ for (String topicName : topicNameSet) {
+ keySet = topicNameCacheIndex.get(topicName);
+ if (keySet != null && !keySet.isEmpty()) {
+ qryTopicKey.addAll(keySet);
+ }
+ }
+ }
+ if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+ if (qryTopicKey == null) {
+ qryTopicKey = new HashSet<>();
+ }
+ for (Integer brokerId : brokerIdSet) {
+ keySet = brokerIdCacheIndex.get(brokerId);
+ if (keySet != null && !keySet.isEmpty()) {
+ qryTopicKey.addAll(keySet);
+ }
+ }
+ }
+ if (qryTopicKey == null) {
+ qryTopicKey = new HashSet<>(topicConfCache.keySet());
+ }
+ if (qryTopicKey.isEmpty()) {
+ return retEntityMap;
+ }
+ for (String recordKey: qryTopicKey) {
+ TopicDeployEntity entity = topicConfCache.get(recordKey);
+ if (entity == null) {
+ continue;
+ }
+ items = retEntityMap.get(entity.getBrokerId());
+ if (items == null) {
+ items = new ArrayList<>();
+ retEntityMap.put(entity.getBrokerId(), items);
+ }
+ items.add(entity);
+ }
return retEntityMap;
}
@Override
- public Map<String, List<TopicDeployConfEntity>> getTopicConfMapByTopicAndBrokerIds(
+ public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
Set<String> topicSet, Set<Integer> brokerIdSet) {
- TopicDeployConfEntity tmpEntity;
- List<TopicDeployConfEntity> itemLst;
+ TopicDeployEntity tmpEntity;
+ List<TopicDeployEntity> itemLst;
ConcurrentHashSet<String> recSet;
Set<String> hitKeys = new HashSet<>();
- Map<String, List<TopicDeployConfEntity>> retEntityMap = new HashMap<>();
+ Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
if (((topicSet == null) || (topicSet.isEmpty()))
&& ((brokerIdSet == null) || (brokerIdSet.isEmpty()))) {
- for (TopicDeployConfEntity entity : topicConfCache.values()) {
+ for (TopicDeployEntity entity : topicConfCache.values()) {
itemLst = retEntityMap.get(entity.getTopicName());
if (itemLst == null) {
itemLst = new ArrayList<>();
@@ -344,7 +422,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
continue;
}
for (String key : keySet) {
- TopicDeployConfEntity entity = topicConfCache.get(key);
+ TopicDeployEntity entity = topicConfCache.get(key);
if (entity == null) {
continue;
}
@@ -367,9 +445,9 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
@Override
- public Map<String, TopicDeployConfEntity> getConfiguredTopicInfo(int brokerId) {
- TopicDeployConfEntity tmpEntity;
- Map<String, TopicDeployConfEntity> retEntityMap = new HashMap<>();
+ public Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId) {
+ TopicDeployEntity tmpEntity;
+ Map<String, TopicDeployEntity> retEntityMap = new HashMap<>();
ConcurrentHashSet<String> records = brokerIdCacheIndex.get(brokerId);
if (records == null || records.isEmpty()) {
return retEntityMap;
@@ -391,7 +469,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
* @param result process result with old value
* @return
*/
- private boolean putTopicConfig2Bdb(TopicDeployConfEntity memEntity, ProcessResult result) {
+ private boolean putTopicConfig2Bdb(TopicDeployEntity memEntity, ProcessResult result) {
BdbTopicConfEntity retData = null;
BdbTopicConfEntity bdbEntity =
memEntity.buildBdbTopicConfEntity();
@@ -420,7 +498,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
private void delCacheRecord(String recordKey) {
- TopicDeployConfEntity curEntity =
+ TopicDeployEntity curEntity =
topicConfCache.remove(recordKey);
if (curEntity == null) {
return;
@@ -452,7 +530,7 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
}
- private void addOrUpdCacheRecord(TopicDeployConfEntity entity) {
+ private void addOrUpdCacheRecord(TopicDeployEntity entity) {
topicConfCache.put(entity.getRecordKey(), entity);
// add topic index map
ConcurrentHashSet<String> keySet =
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/TopicProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/TopicProcessResult.java
new file mode 100644
index 0000000..a8bbd62
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/TopicProcessResult.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+
+
+public class TopicProcessResult extends ProcessResult {
+ private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
+ private String topicName = "";
+
+ public TopicProcessResult() {
+
+ }
+
+ public TopicProcessResult(int brokerId,
+ String topicName,
+ ProcessResult result) {
+ super(result);
+ this.brokerId = brokerId;
+ this.topicName = topicName;
+ }
+
+ public int getBrokerId() {
+ return brokerId;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
new file mode 100644
index 0000000..fe1501a
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -0,0 +1,520 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.policies.FlowCtrlItem;
+import org.apache.tubemq.corebase.policies.FlowCtrlRuleHandler;
+import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+
+
+
+public class WebAdminFlowRuleHandler extends AbstractWebHandler {
+
+ private static final String blankFlowCtrlRules = "[]";
+ private static final List<Integer> allowedPriorityVal = Arrays.asList(1, 2, 3);
+ private static final Set<String> rsvGroupNameSet =
+ new HashSet<>(Arrays.asList(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL));
+
+
+ public WebAdminFlowRuleHandler(TMaster master) {
+ super(master);
+ }
+
+ @Override
+ public void registerWebApiMethod() {
+ // register query method
+ registerQueryWebMethod("admin_query_def_flow_control_rule",
+ "adminQueryDefGroupFlowCtrlRule");
+ registerQueryWebMethod("admin_query_group_flow_control_rule",
+ "adminQuerySpecGroupFlowCtrlRule");
+ // register modify method
+ registerModifyWebMethod("admin_set_def_flow_control_rule",
+ "adminSetDefGroupFlowCtrlRule");
+ registerModifyWebMethod("admin_set_group_flow_control_rule",
+ "adminSetSpecGroupFlowCtrlRule");
+ registerModifyWebMethod("admin_rmv_def_flow_control_rule",
+ "adminDelDefGroupFlowCtrlRuleStatus");
+ registerModifyWebMethod("admin_rmv_group_flow_control_rule",
+ "adminDelSpecGroupFlowCtrlRuleStatus");
+ registerModifyWebMethod("admin_upd_def_flow_control_rule",
+ "adminModDefGroupFlowCtrlRuleStatus");
+ registerModifyWebMethod("admin_upd_group_flow_control_rule",
+ "adminModSpecGroupFlowCtrlRuleStatus");
+ }
+
+ public StringBuilder adminQueryDefGroupFlowCtrlRule(HttpServletRequest req) {
+ return innQueryGroupFlowCtrlRule(req, true);
+ }
+
+ public StringBuilder adminQuerySpecGroupFlowCtrlRule(HttpServletRequest req) {
+ return innQueryGroupFlowCtrlRule(req, false);
+ }
+
+ public StringBuilder adminSetDefGroupFlowCtrlRule(HttpServletRequest req) {
+ return innSetFlowControlRule(req, true);
+ }
+
+ public StringBuilder adminSetSpecGroupFlowCtrlRule(HttpServletRequest req) {
+ return innSetFlowControlRule(req, false);
+ }
+
+ public StringBuilder adminDelDefGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+ return innDelGroupFlowCtrlRuleStatus(req, true);
+ }
+
+ public StringBuilder adminDelSpecGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+ return innDelGroupFlowCtrlRuleStatus(req, false);
+ }
+
+ public StringBuilder adminModDefGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+ return innModGroupFlowCtrlRuleStatus(req, true);
+ }
+
+ public StringBuilder adminModSpecGroupFlowCtrlRuleStatus(HttpServletRequest req) {
+ return innModGroupFlowCtrlRuleStatus(req, false);
+ }
+
+ /**
+ * add flow control rule
+ *
+ * @param req
+ * @param do4DefFlowCtrl
+ * @return
+ */
+ private StringBuilder innSetFlowControlRule(HttpServletRequest req,
+ boolean do4DefFlowCtrl) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // get createUser info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.CREATEUSER, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ String createUser = (String) result.retData1;
+ // check and get create date
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.CREATEDATE, false, new Date(), result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Date createDate = (Date) result.retData1;
+ // get rule required status info
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.STATUSID, false, 0, 0, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int statusId = (int) result.retData1;
+ // get and valid priority info
+ if (!getQryPriorityIdWithCheck(req, false, 301, 101, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int qryPriorityId = (int) result.retData1;
+ // get group name info
+ if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> batchGroupNames = (Set<String>) result.retData1;
+ // get and flow control rule info
+ int ruleCnt = getAndCheckFlowRules(req, blankFlowCtrlRules, result);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ String flowCtrlInfo = (String) result.retData1;
+ try {
+ // add flow control to bdb
+ for (String groupName : batchGroupNames) {
+ if (groupName.equals(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
+ brokerConfManager.confAddBdbGroupFlowCtrl(
+ new BdbGroupFlowCtrlEntity(flowCtrlInfo,
+ statusId, ruleCnt, qryPriorityId, "",
+ false, createUser, createDate));
+ } else {
+ brokerConfManager.confAddBdbGroupFlowCtrl(
+ new BdbGroupFlowCtrlEntity(groupName,
+ flowCtrlInfo, statusId, ruleCnt, qryPriorityId, "",
+ false, createUser, createDate));
+ }
+ }
+ WebParameterUtils.buildSuccessResult(sBuilder);
+ } catch (Exception e) {
+ WebParameterUtils.buildFailResult(sBuilder, e.getMessage());
+ }
+ return sBuilder;
+ }
+
+ /**
+ * delete flow control rule
+ *
+ * @param req
+ * @param do4DefFlowCtrl
+ * @return
+ */
+ private StringBuilder innDelGroupFlowCtrlRuleStatus(HttpServletRequest req,
+ boolean do4DefFlowCtrl) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // get modifyUser info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.CREATEUSER, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ String modifyUser = (String) result.retData1;
+ // check and get modifyDate date
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.CREATEDATE, false, new Date(), result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Date modifyDate = (Date) result.retData1;
+ // get group name info
+ if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> batchGroupNames = (Set<String>) result.retData1;
+ try {
+ brokerConfManager.confDeleteBdbGroupFlowCtrl(batchGroupNames);
+ WebParameterUtils.buildSuccessResult(sBuilder);
+ } catch (Exception e) {
+ WebParameterUtils.buildFailResult(sBuilder, e.getMessage());
+ }
+ return sBuilder;
+ }
+
+ /**
+ * modify flow control rule
+ *
+ * @param req
+ * @param do4DefFlowCtrl
+ * @return
+ */
+ private StringBuilder innModGroupFlowCtrlRuleStatus(HttpServletRequest req,
+ boolean do4DefFlowCtrl) {
+ // #lizard forgives
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // get modifyUser info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.CREATEUSER, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ String modifyUser = (String) result.retData1;
+ // check and get modifyDate date
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.CREATEDATE, false, new Date(), result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Date modifyDate = (Date) result.retData1;
+ // get group name info
+ if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> batchGroupNames = (Set<String>) result.retData1;
+ // get rule required status info
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.STATUSID, false,
+ TBaseConstants.META_VALUE_UNDEFINED, 0, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int statusId = (int) result.retData1;
+ // get and flow control rule info
+ int ruleCnt = getAndCheckFlowRules(req, null, result);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ String newFlowCtrlInfo = (String) result.retData1;
+ // get and valid priority info
+ if (!getQryPriorityIdWithCheck(req, false,
+ TBaseConstants.META_VALUE_UNDEFINED, 101, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int qryPriorityId = (int) result.retData1;
+ try {
+ boolean foundChange;
+ for (String groupName : batchGroupNames) {
+ // check if record changed
+ BdbGroupFlowCtrlEntity oldEntity =
+ brokerConfManager.getBdbGroupFlowCtrl(groupName);
+ if (oldEntity != null) {
+ foundChange = false;
+ BdbGroupFlowCtrlEntity newGroupFlowCtrlEntity =
+ new BdbGroupFlowCtrlEntity(oldEntity.getGroupName(),
+ oldEntity.getFlowCtrlInfo(), oldEntity.getStatusId(),
+ oldEntity.getRuleCnt(), oldEntity.getAttributes(),
+ oldEntity.getSsdTranslateId(), oldEntity.isNeedSSDProc(),
+ oldEntity.getCreateUser(), oldEntity.getCreateDate());
+ if (statusId != TBaseConstants.META_VALUE_UNDEFINED
+ && statusId != oldEntity.getStatusId()) {
+ foundChange = true;
+ newGroupFlowCtrlEntity.setStatusId(statusId);
+ }
+ if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+ && qryPriorityId != oldEntity.getQryPriorityId()) {
+ foundChange = true;
+ newGroupFlowCtrlEntity.setQryPriorityId(qryPriorityId);
+ }
+ if (TStringUtils.isNotBlank(newFlowCtrlInfo)
+ && !newFlowCtrlInfo.equals(oldEntity.getFlowCtrlInfo())) {
+ foundChange = true;
+ newGroupFlowCtrlEntity.setFlowCtrlInfo(ruleCnt, newFlowCtrlInfo);
+ }
+ // update record if found change
+ if (foundChange) {
+ try {
+ newGroupFlowCtrlEntity.setModifyInfo(modifyUser, modifyDate);
+ brokerConfManager.confUpdateBdbGroupFlowCtrl(newGroupFlowCtrlEntity);
+ } catch (Throwable ee) {
+ //
+ }
+ }
+ }
+ }
+ WebParameterUtils.buildSuccessResult(sBuilder);
+ } catch (Exception e) {
+ WebParameterUtils.buildFailResult(sBuilder, e.getMessage());
+ }
+ return sBuilder;
+ }
+
+ /**
+ * query flow control rule
+ *
+ * @param req
+ * @param do4DefFlowCtrl
+ * @return
+ */
+ private StringBuilder innQueryGroupFlowCtrlRule(HttpServletRequest req,
+ boolean do4DefFlowCtrl) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity = new BdbGroupFlowCtrlEntity();
+ // get modifyUser info
+ WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.CREATEUSER, false, null, result);
+ bdbGroupFlowCtrlEntity.setCreateUser((String) result.retData1);
+ // get status id info
+ WebParameterUtils.getIntParamValue(req, WebFieldDef.STATUSID, false,
+ TBaseConstants.META_VALUE_UNDEFINED, 0, result);
+ bdbGroupFlowCtrlEntity.setStatusId((int) result.retData1);
+ // get and valid priority info
+ getQryPriorityIdWithCheck(req, false,
+ TBaseConstants.META_VALUE_UNDEFINED, 101, result);
+ bdbGroupFlowCtrlEntity.setQryPriorityId((int) result.retData1);
+ getGroupNameWithCheck(req, false, do4DefFlowCtrl, result);
+ Set<String> batchGroupNames = (Set<String>) result.retData1;
+ // query group flow ctrl infos
+ List<BdbGroupFlowCtrlEntity> webGroupFlowCtrlEntities =
+ brokerConfManager.confGetBdbGroupFlowCtrl(bdbGroupFlowCtrlEntity);
+ int totalCnt = 0;
+ boolean found = false;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ if (do4DefFlowCtrl) {
+ for (BdbGroupFlowCtrlEntity entity : webGroupFlowCtrlEntities) {
+ if (entity.getGroupName().equals(
+ TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder = entity.toJsonString(sBuilder);
+ break;
+ }
+ }
+ } else {
+ for (BdbGroupFlowCtrlEntity entity : webGroupFlowCtrlEntities) {
+ if (entity.getGroupName().equals(
+ TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
+ continue;
+ }
+ found = false;
+ for (String tmpGroupName : batchGroupNames) {
+ if (entity.getGroupName().equals(tmpGroupName)) {
+ found = true;
+ break;
+ }
+ }
+ if (found) {
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder = entity.toJsonString(sBuilder);
+ }
+ }
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+ return sBuilder;
+ }
+
+ // translate rule info to json format string
+ private int getAndCheckFlowRules(HttpServletRequest req,
+ String defValue,
+ ProcessResult result) {
+ int ruleCnt = 0;
+ StringBuilder strBuffer = new StringBuilder(512);
+ // get parameter value
+ String paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.name);
+ if (paramValue == null) {
+ paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.shortName);
+ }
+ if (TStringUtils.isBlank(paramValue)) {
+ result.setSuccResult(defValue);
+ return ruleCnt;
+ }
+ strBuffer.append("[");
+ paramValue = paramValue.trim();
+ List<Integer> ruleTypes = Arrays.asList(0, 1, 2, 3);
+ FlowCtrlRuleHandler flowCtrlRuleHandler =
+ new FlowCtrlRuleHandler(true);
+ Map<Integer, List<FlowCtrlItem>> flowCtrlItemMap;
+ try {
+ flowCtrlItemMap =
+ flowCtrlRuleHandler.parseFlowCtrlInfo(paramValue);
+ } catch (Throwable e) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parse parameter ").append(WebFieldDef.FLOWCTRLSET.name)
+ .append(" failure: '").append(e.toString()).toString());
+ return 0;
+ }
+ for (Integer typeId : ruleTypes) {
+ if (typeId != null) {
+ int rules = 0;
+ List<FlowCtrlItem> flowCtrlItems = flowCtrlItemMap.get(typeId);
+ if (flowCtrlItems != null) {
+ if (ruleCnt++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append("{\"type\":").append(typeId.intValue()).append(",\"rule\":[");
+ for (FlowCtrlItem flowCtrlItem : flowCtrlItems) {
+ if (flowCtrlItem != null) {
+ if (rules++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer = flowCtrlItem.toJsonString(strBuffer);
+ }
+ }
+ strBuffer.append("]}");
+ }
+ }
+ }
+ strBuffer.append("]");
+ result.setSuccResult(strBuffer.toString());
+ return ruleCnt;
+ }
+
+ private boolean getGroupNameWithCheck(HttpServletRequest req, boolean required,
+ boolean do4DefFlowCtrl, ProcessResult result) {
+ if (do4DefFlowCtrl) {
+ result.setSuccResult(rsvGroupNameSet);
+ return true;
+ }
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, required, null, result)) {
+ return result.success;
+ }
+ Set<String> inGroupSet = (Set<String>) result.retData1;
+ for (String rsvGroup : rsvGroupNameSet) {
+ if (inGroupSet.contains(rsvGroup)) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Illegal value in ").append(WebFieldDef.COMPSGROUPNAME.name)
+ .append(" parameter: '").append(rsvGroup)
+ .append("' is a system reserved value!").toString());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean getQryPriorityIdWithCheck(HttpServletRequest req, boolean required,
+ int defValue, int minValue,
+ ProcessResult result) {
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.QRYPRIORITYID, required,
+ defValue, minValue, result)) {
+ return result.success;
+ }
+ int qryPriorityId = (int) result.retData1;
+ if (qryPriorityId > 303 || qryPriorityId < 101) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Illegal value in ").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" parameter: ").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" value must be greater than or equal")
+ .append(" to 101 and less than or equal to 303!").toString());
+ return false;
+ }
+ if (!allowedPriorityVal.contains(qryPriorityId % 100)) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Illegal value in ").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" parameter: the units of ").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" must in ").append(allowedPriorityVal).toString());
+ return false;
+ }
+ if (!allowedPriorityVal.contains(qryPriorityId / 100)) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Illegal value in ").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" parameter: the hundreds of ").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" must in ").append(allowedPriorityVal).toString());
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index f11e3b9..4779674 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -39,7 +39,7 @@ import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -207,7 +207,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
int totalCnt = 0;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
for (BrokerConfEntity entity : qryResult.values()) {
- Map<String, TopicDeployConfEntity> topicConfEntityMap =
+ Map<String, TopicDeployEntity> topicConfEntityMap =
metaDataManager.getBrokerTopicConfEntitySet(entity.getBrokerId());
if (!isValidRecord(topicNameSet, isInclude, topicStatus, topicConfEntityMap)) {
continue;
@@ -518,7 +518,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
// check broker configure status
List<BrokerProcessResult> retInfo = new ArrayList<>();
Map<Integer, BrokerConfEntity> needDelMap = new HashMap<>();
- Map<String, TopicDeployConfEntity> topicConfigMap;
+ Map<String, TopicDeployEntity> topicConfigMap;
for (BrokerConfEntity entity : qryResult.values()) {
if (entity == null) {
continue;
@@ -539,7 +539,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
boolean isMatched = true;
if (isReservedData) {
- for (Map.Entry<String, TopicDeployConfEntity> entry : topicConfigMap.entrySet()) {
+ for (Map.Entry<String, TopicDeployEntity> entry : topicConfigMap.entrySet()) {
if (entry.getValue() == null) {
continue;
}
@@ -579,7 +579,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
continue;
}
if (isReservedData) {
- Map<String, TopicDeployConfEntity> brokerTopicConfMap =
+ Map<String, TopicDeployEntity> brokerTopicConfMap =
metaDataManager.getBrokerTopicConfEntitySet(entry.getBrokerId());
if (brokerTopicConfMap != null) {
metaDataManager.delBrokerTopicConfig(opTupleInfo.getF1(),
@@ -606,7 +606,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
*/
private boolean isValidRecord(Set<String> qryTopicSet, Boolean isInclude,
TopicStatus topicStatus,
- Map<String, TopicDeployConfEntity> topicConfEntityMap) {
+ Map<String, TopicDeployEntity> topicConfEntityMap) {
if ((topicConfEntityMap == null) || (topicConfEntityMap.isEmpty())) {
if ((qryTopicSet.isEmpty() || !isInclude)
&& topicStatus == TopicStatus.STATUS_TOPIC_UNDEFINED) {
@@ -639,7 +639,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
}
// second check topic status if match
- for (TopicDeployConfEntity topicConfEntity : topicConfEntityMap.values()) {
+ for (TopicDeployEntity topicConfEntity : topicConfEntityMap.values()) {
if (topicConfEntity.getDeployStatus() == topicStatus) {
return true;
}
@@ -656,12 +656,12 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @return
*/
private StringBuilder addTopicInfo(Boolean withTopic, StringBuilder sBuilder,
- Map<String, TopicDeployConfEntity> topicConfEntityMap) {
+ Map<String, TopicDeployEntity> topicConfEntityMap) {
if (withTopic) {
sBuilder.append(",\"topicSet\":[");
int topicCount = 0;
if (topicConfEntityMap != null) {
- for (TopicDeployConfEntity topicEntity : topicConfEntityMap.values()) {
+ for (TopicDeployEntity topicEntity : topicConfEntityMap.values()) {
if (topicCount++ > 0) {
sBuilder.append(",");
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index edeb31a..532a05f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -37,7 +37,7 @@ import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
@@ -303,7 +303,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
}
Set<String> topicNameSet = (Set<String>) result.retData1;
// query topic configure info
- Map<String, List<TopicDeployConfEntity>> topicConfMap =
+ Map<String, List<TopicDeployEntity>> topicConfMap =
metaDataManager.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIds);
TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
int totalCount = 0;
@@ -316,7 +316,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
boolean isAcceptSubscribe = false;
boolean enableAuthControl = false;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
- for (Map.Entry<String, List<TopicDeployConfEntity>> entry : topicConfMap.entrySet()) {
+ for (Map.Entry<String, List<TopicDeployEntity>> entry : topicConfMap.entrySet()) {
if (totalCount++ > 0) {
sBuilder.append(",");
}
@@ -328,7 +328,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
enableAuthControl = false;
isAcceptPublish = false;
isAcceptSubscribe = false;
- for (TopicDeployConfEntity entity : entry.getValue()) {
+ for (TopicDeployEntity entity : entry.getValue()) {
brokerCount++;
TopicPropGroup topicProps = entity.getTopicProps();
totalCfgNumPartCount +=
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java
new file mode 100644
index 0000000..a71fb12
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java
@@ -0,0 +1,862 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class WebTopicConfHandler extends AbstractWebHandler {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(WebTopicConfHandler.class);
+
+ /**
+ * Constructor
+ *
+ * @param master tube master
+ */
+ public WebTopicConfHandler(TMaster master) {
+ super(master);
+ }
+
+
+
+ @Override
+ public void registerWebApiMethod() {
+ // register query method
+ registerQueryWebMethod("admin_query_topic_info",
+ "adminQueryTopicCfgEntityAndRunInfo");
+ registerQueryWebMethod("admin_query_broker_topic_config_info",
+ "adminQueryBrokerTopicCfgAndRunInfo");
+ registerQueryWebMethod("admin_query_topicName",
+ "adminQuerySimpleTopicName");
+ registerQueryWebMethod("admin_query_brokerId",
+ "adminQuerySimpleBrokerId");
+ // register modify method
+ registerModifyWebMethod("admin_add_new_topic_record",
+ "adminAddTopicEntityInfo");
+ registerModifyWebMethod("admin_bath_add_new_topic_record",
+ "adminBatchAddTopicEntityInfo");
+ registerModifyWebMethod("admin_modify_topic_info",
+ "adminModifyTopicEntityInfo");
+ registerModifyWebMethod("admin_delete_topic_info",
+ "adminDeleteTopicEntityInfo");
+ registerModifyWebMethod("admin_redo_deleted_topic_info",
+ "adminRedoDeleteTopicEntityInfo");
+ registerModifyWebMethod("admin_remove_topic_info",
+ "adminRemoveTopicEntityInfo");
+ }
+
+ /**
+ * Query topic info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryTopicCfgEntityAndRunInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ TopicDeployEntity qryEntity = new TopicDeployEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ // get brokerPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int brokerPort = (int) result.getRetData();
+ // get and valid topicProps info
+ if (!WebParameterUtils.getTopicPropInfo(req,
+ null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
+ // get and valid TopicStatusId info
+ if (!WebParameterUtils.getTopicStatusParamValue(req,
+ false, TopicStatus.STATUS_TOPIC_UNDEFINED, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ TopicStatus topicStatus = (TopicStatus) result.getRetData();
+ qryEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, null, topicStatus, topicProps);
+ Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
+ metaDataManager.getTopicConfEntityMap(topicNameSet, brokerIdSet, qryEntity);
+ // build query result
+ return buildQueryResult(sBuffer, true, topicDeployInfoMap);
+ }
+
+ private StringBuilder buildQueryResult(StringBuilder sBuffer,
+ boolean withAuthInfo,
+ Map<String, List<TopicDeployEntity>> topicDeployInfoMap) {
+ // build query result
+ int totalCnt = 0;
+ int totalCfgNumPartCount = 0;
+ int totalRunNumPartCount = 0;
+ boolean isSrvAcceptPublish = false;
+ boolean isSrvAcceptSubscribe = false;
+ boolean isAcceptPublish = false;
+ boolean isAcceptSubscribe = false;
+ ManageStatus manageStatus;
+ Tuple2<Boolean, Boolean> pubSubStatus;
+ TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
+ totalCfgNumPartCount = 0;
+ totalRunNumPartCount = 0;
+ isSrvAcceptPublish = false;
+ isSrvAcceptSubscribe = false;
+ isAcceptPublish = false;
+ isAcceptSubscribe = false;
+ TopicCtrlEntity ctrlEntity =
+ metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+ ctrlEntity.toWebJsonStr(sBuffer, true, false);
+ sBuffer.append(",\"deployInfo\":[");
+ int brokerCount = 0;
+ for (TopicDeployEntity entity : entry.getValue()) {
+ if (brokerCount++ > 0) {
+ sBuffer.append(",");
+ }
+ totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
+ entity.toWebJsonStr(sBuffer, true, false);
+ sBuffer.append("\",\"runInfo\":{");
+ BrokerConfEntity brokerConfEntity =
+ metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
+ String strManageStatus = "-";
+ if (brokerConfEntity != null) {
+ manageStatus = brokerConfEntity.getManageStatus();
+ strManageStatus = manageStatus.getDescription();
+ pubSubStatus = manageStatus.getPubSubStatus();
+ isAcceptPublish = pubSubStatus.getF0();
+ isAcceptSubscribe = pubSubStatus.getF1();
+ }
+ BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
+ entity.getBrokerIp(), entity.getBrokerPort());
+ TopicInfo topicInfo =
+ topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+ if (topicInfo == null) {
+ sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
+ .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
+ } else {
+ if (isAcceptPublish) {
+ sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
+ if (topicInfo.isAcceptPublish()) {
+ isSrvAcceptPublish = true;
+ }
+ } else {
+ sBuffer.append("\"acceptPublish\":false");
+ }
+ if (isAcceptSubscribe) {
+ sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
+ if (topicInfo.isAcceptSubscribe()) {
+ isSrvAcceptSubscribe = true;
+ }
+ } else {
+ sBuffer.append(",\"acceptSubscribe\":false");
+ }
+ totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+ sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
+ .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+ .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
+ }
+ sBuffer.append("}}");
+ }
+ sBuffer.append("],\"infoCount\":").append(brokerCount)
+ .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
+ .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
+ .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
+ .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount);
+ if (withAuthInfo) {
+ sBuffer.append(",\"authConsumeGroup\":[");
+ List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
+ metaDataManager.getConsumeCtrlByTopic(entry.getKey());
+ int countJ = 0;
+ for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
+ if (countJ++ > 0) {
+ sBuffer.append(",");
+ }
+ groupEntity.toWebJsonStr(sBuffer, true, false);
+ }
+ sBuffer.append("],\"groupCount\":").append(countJ);
+ }
+ sBuffer.append("}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ /**
+ * Query broker topic config info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryBrokerTopicCfgAndRunInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ Map<Integer, List<TopicDeployEntity>> queryResult =
+ metaDataManager.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+ // build query result
+ int dataCount = 0;
+ int totalStoreNum = 0;
+ int totalNumPartCount = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (Map.Entry<Integer, List<TopicDeployEntity>> entry : queryResult.entrySet()) {
+ if (entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
+ BrokerConfEntity brokerConf =
+ metaDataManager.getBrokerConfByBrokerId(entry.getKey());
+ if (brokerConf == null) {
+ continue;
+ }
+ BrokerSyncStatusInfo brokerSyncInfo =
+ metaDataManager.getBrokerRunSyncStatusInfo(entry.getKey());
+ List<TopicDeployEntity> topicDeployInfo = entry.getValue();
+ // build return info item
+ if (dataCount++ > 0) {
+ sBuffer.append(",");
+ }
+ totalStoreNum = 0;
+ totalNumPartCount = 0;
+ sBuffer.append("{\"brokerId\":").append(brokerConf.getBrokerId())
+ .append(",\"brokerIp\":\"").append(brokerConf.getBrokerIp())
+ .append("\",\"brokerPort\":").append(brokerConf.getBrokerPort())
+ .append(",\"runInfo\":{");
+ String strManageStatus =
+ brokerConf.getManageStatus().getDescription();
+ Tuple2<Boolean, Boolean> pubSubStatus =
+ brokerConf.getManageStatus().getPubSubStatus();
+ if (brokerSyncInfo == null) {
+ sBuffer.append("\"acceptPublish\":\"-\"")
+ .append(",\"acceptSubscribe\":\"-\"")
+ .append(",\"totalPartitionNum\":\"-\"")
+ .append(",\"totalTopicStoreNum\":\"-\"")
+ .append(",\"brokerManageStatus\":\"-\"");
+ } else {
+ if (pubSubStatus.getF0()) {
+ sBuffer.append("\"acceptPublish\":")
+ .append(brokerSyncInfo.isAcceptPublish());
+ } else {
+ sBuffer.append("\"acceptPublish\":false");
+ }
+ if (pubSubStatus.getF1()) {
+ sBuffer.append(",\"acceptSubscribe\":")
+ .append(brokerSyncInfo.isAcceptSubscribe());
+ } else {
+ sBuffer.append(",\"acceptSubscribe\":false");
+ }
+ for (TopicDeployEntity topicEntity : topicDeployInfo) {
+ if (topicEntity == null) {
+ continue;
+ }
+ totalStoreNum +=
+ topicEntity.getNumTopicStores();
+ totalNumPartCount +=
+ topicEntity.getNumTopicStores() * topicEntity.getNumPartitions();
+ }
+ sBuffer.append(",\"totalPartitionNum\":").append(totalNumPartCount)
+ .append(",\"totalTopicStoreNum\":").append(totalStoreNum)
+ .append(",\"brokerManageStatus\":\"")
+ .append(strManageStatus).append("\"");
+ }
+ sBuffer.append("}}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, dataCount);
+ return sBuffer;
+ }
+
+ /**
+ * Query broker's topic-name set info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ Map<Integer, Set<String>> brokerTopicConfigMap =
+ metaDataManager.getBrokerTopicConfigInfo(brokerIds);
+ // build query result
+ int dataCount = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
+ if (dataCount++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
+ int topicCnt = 0;
+ Set<String> topicSet = entry.getValue();
+ for (String topic : topicSet) {
+ if (topicCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("\"").append(topic).append("\"");
+ }
+ sBuilder.append("],\"topicCount\":").append(topicCnt).append("}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, dataCount);
+ return sBuilder;
+ }
+
+ /**
+ * Query topic's broker id set
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.WITHIP, false, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ boolean withIp = (Boolean) result.retData1;
+ Map<String, Map<Integer, String>> topicBrokerConfigMap =
+ metaDataManager.getTopicBrokerConfigInfo(topicNameSet);
+ // build query result
+ int dataCount = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ for (Map.Entry<String, Map<Integer, String>> entry : topicBrokerConfigMap.entrySet()) {
+ if (dataCount++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"topicName\":\"").append(entry.getKey()).append("\",\"brokerInfo\":[");
+ int topicCnt = 0;
+ Map<Integer, String> brokerMap = entry.getValue();
+ if (withIp) {
+ for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+ if (topicCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"brokerId\":").append(entry1.getKey())
+ .append(",\"brokerIp\":\"").append(entry1.getValue()).append("\"}");
+ }
+ } else {
+ for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+ if (topicCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append(entry1.getKey());
+ }
+ }
+ sBuilder.append("],\"brokerCnt\":").append(topicCnt).append("}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, dataCount);
+ return sBuilder;
+ }
+
+ /**
+ * Add new topic record
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminAddTopicEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // check and get brokerId info
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ // check and get cluster default setting info
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting();
+ if (defClusterSetting == null) {
+ if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ defClusterSetting = metaDataManager.getClusterDefSetting();
+ }
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(req,
+ defClusterSetting.getClsDefTopicProps(), result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ TopicPropGroup topicPropInfo = (TopicPropGroup) result.getRetData();
+ /* check max message size
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.MAXMSGSIZEINMB, false,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
+ result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int inMaxMsgSizeMB = (int) result.getRetData();
+ */
+ List<TopicProcessResult> retInfo =
+ metaDataManager.addTopicDeployInfo(opTupleInfo.getF0(),
+ opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ topicNameSet, topicPropInfo, sBuilder, result);
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Add new topic record in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchAddTopicEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get add record map
+ if (!getTopicDeployJsonSetInfo(req, true, true, opTupleInfo.getF0(),
+ opTupleInfo.getF1(), opTupleInfo.getF2(), null, sBuilder, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Map<String, TopicDeployEntity> addRecordMap =
+ (Map<String, TopicDeployEntity>) result.getRetData();
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (TopicDeployEntity topicDeployInfo : addRecordMap.values()) {
+ retInfo.add(metaDataManager.addTopicDeployInfo(topicDeployInfo, sBuilder, result));
+ }
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Modify topic info
+ *
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ // #lizard forgives
+ public StringBuilder adminModifyTopicEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // check and get brokerId info
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(req, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
+ // modify records
+ List<TopicProcessResult> retInfo =
+ metaDataManager.modTopicConfig(opTupleInfo.getF0(),
+ opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ topicNameSet, topicProps, sBuilder, result);
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Delete topic info
+ *
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ public StringBuilder adminDeleteTopicEntityInfo(HttpServletRequest req) {
+ return innModifyTopicStatusEntityInfo(req, TopicStatus.STATUS_TOPIC_SOFT_DELETE);
+ }
+
+ /**
+ * Remove topic info
+ *
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ public StringBuilder adminRemoveTopicEntityInfo(HttpServletRequest req) {
+ return innModifyTopicStatusEntityInfo(req, TopicStatus.STATUS_TOPIC_SOFT_REMOVE);
+ }
+
+ /**
+ * Redo delete topic info
+ *
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ // #lizard forgives
+ public StringBuilder adminRedoDeleteTopicEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // check and get brokerId info
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ // modify records
+ List<TopicProcessResult> retInfo =
+ metaDataManager.modRedoDelTopicConf(opTupleInfo.getF0(),
+ opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ topicNameSet, sBuffer, result);
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+
+
+ private boolean getTopicDeployJsonSetInfo(HttpServletRequest req, boolean required,
+ boolean isCreate, long dataVerId,
+ String operator, Date operateDate,
+ List<Map<String, String>> defValue,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.TOPICJSONSET, required, defValue, result)) {
+ return result.success;
+ }
+ List<Map<String, String>> deployJsonArray =
+ (List<Map<String, String>>) result.retData1;
+ // check and get cluster default setting info
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting();
+ if (defClusterSetting == null) {
+ if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
+ return result.isSuccess();
+ }
+ defClusterSetting = metaDataManager.getClusterDefSetting();
+ }
+ TopicDeployEntity itemConf;
+ Map<String, TopicDeployEntity> addRecordMap = new HashMap<>();
+ // check and get topic deployment configure
+ HashMap<String, TopicDeployEntity> addedRecordMap = new HashMap<>();
+ for (int j = 0; j < deployJsonArray.size(); j++) {
+ Map<String, String> confMap = deployJsonArray.get(j);
+ // check and get operation info
+ long itemDataVerId = dataVerId;
+ String itemCreator = operator;
+ Date itemCreateDate = operateDate;
+ if (!WebParameterUtils.getAUDBaseInfo(confMap, true, result)) {
+ return result.isSuccess();
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ if (opTupleInfo.getF0() != TBaseConstants.META_VALUE_UNDEFINED) {
+ itemDataVerId = opTupleInfo.getF0();
+ }
+ if (opTupleInfo.getF1() != null) {
+ itemCreator = opTupleInfo.getF1();
+ }
+ if (opTupleInfo.getF2() != null) {
+ itemCreateDate = opTupleInfo.getF2();
+ }
+ // get topicName configure info
+ if (!WebParameterUtils.getStringParamValue(confMap,
+ WebFieldDef.TOPICNAME, true, "", result)) {
+ return result.success;
+ }
+ String topicName = (String) result.retData1;
+ // get broker configure info
+ if (!getBrokerConfInfo(confMap, sBuilder, result)) {
+ return result.isSuccess();
+ }
+ BrokerConfEntity brokerConf =
+ (BrokerConfEntity) result.getRetData();
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(confMap,
+ defClusterSetting.getClsDefTopicProps(), result)) {
+ return result.isSuccess();
+ }
+ TopicPropGroup topicPropInfo = (TopicPropGroup) result.getRetData();
+ /* check max message size
+ if (!WebParameterUtils.getIntParamValue(confMap,
+ WebFieldDef.MAXMSGSIZEINMB, false,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
+ result)) {
+ return result.isSuccess();
+ }
+ int inMaxMsgSizeMB = (int) result.getRetData();
+ */
+ // get topicNameId field
+ int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
+ TopicCtrlEntity topicCtrlEntity =
+ metaDataManager.getTopicCtrlByTopicName(topicName);
+ if (topicCtrlEntity != null) {
+ topicNameId = topicCtrlEntity.getTopicId();
+ }
+ itemConf = new TopicDeployEntity(itemDataVerId, itemCreator, itemCreateDate);
+ itemConf.setTopicDeployInfo(brokerConf.getBrokerId(),
+ brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
+ itemConf.updModifyInfo(topicNameId,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
+ addRecordMap.put(itemConf.getRecordKey(), itemConf);
+ }
+ // check result
+ if (addedRecordMap.isEmpty()) {
+ if (isCreate) {
+ result.setFailResult(sBuilder
+ .append("Not found record in ")
+ .append(WebFieldDef.TOPICJSONSET.name)
+ .append(" parameter!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ }
+ }
+ result.setSuccResult(addedRecordMap);
+ return result.isSuccess();
+ }
+
+ private boolean getBrokerConfInfo(Map<String, String> keyValueMap,
+ StringBuilder sBuilder, ProcessResult result) {
+ // get brokerId
+ if (!WebParameterUtils.getIntParamValue(keyValueMap,
+ WebFieldDef.BROKERID, true, 0, 0, result)) {
+ return result.success;
+ }
+ int brokerId = (int) result.getRetData();
+ BrokerConfEntity curEntity =
+ metaDataManager.getBrokerConfByBrokerId(brokerId);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ sBuilder.append("Not found broker configure by ")
+ .append(WebFieldDef.BROKERID.name).append(" = ").append(brokerId)
+ .append(", please create the broker's configure first!").toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(curEntity);
+ return result.isSuccess();
+ }
+
+ private StringBuilder buildRetInfo(List<TopicProcessResult> retInfo,
+ StringBuilder sBuilder) {
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ for (TopicProcessResult entry : retInfo) {
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"brokerId\":").append(entry.getBrokerId())
+ .append("{\"topicName\":\"").append(entry.getTopicName()).append("\"")
+ .append(",\"success\":").append(entry.isSuccess())
+ .append(",\"errCode\":").append(entry.getErrCode())
+ .append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+ return sBuilder;
+ }
+
+
+ /**
+ * Internal method to perform deletion and removal of topic
+ *
+ * @param req
+ * @param topicStatus
+ * @return
+ */
+ // #lizard forgives
+ private StringBuilder innModifyTopicStatusEntityInfo(HttpServletRequest req,
+ TopicStatus topicStatus) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // check and get brokerId info
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ // modify records
+ List<TopicProcessResult> retInfo =
+ metaDataManager.modDelOrRmvTopicConf(opTupleInfo.getF0(),
+ opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ topicNameSet, topicStatus, sBuffer, result);
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+}