You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by yu...@apache.org on 2021/04/26 12:20:17 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-574]Adjust
WebAdminGroupCtrlHandler class implementation (#459)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 4ba691f [INLONG-574]Adjust WebAdminGroupCtrlHandler class implementation (#459)
4ba691f is described below
commit 4ba691f6d7ae5e7616a9a05a91e2d68021f06a8e
Author: gosonzhang <go...@apache.org>
AuthorDate: Mon Apr 26 20:20:08 2021 +0800
[INLONG-574]Adjust WebAdminGroupCtrlHandler class implementation (#459)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq/server/common/fielddef/WebFieldDef.java | 13 +-
.../server/common/statusdef/TopicStsChgType.java | 51 +
.../tubemq/server/common/utils/ProcessResult.java | 2 +-
.../server/common/utils/WebParameterUtils.java | 21 +
.../server/master/metamanage/MetaDataManager.java | 578 ++++----
.../metastore/BdbMetaStoreServiceImpl.java | 6 +-
.../metamanage/metastore/MetaStoreService.java | 4 +-
.../metastore/dao/entity/BaseEntity.java | 66 +-
.../metastore/dao/entity/GroupResCtrlEntity.java | 2 +-
.../metastore/dao/entity/TopicDeployEntity.java | 19 +-
.../dao/mapper/GroupConsumeCtrlMapper.java | 4 +-
.../metastore/dao/mapper/GroupResCtrlMapper.java | 9 +-
.../metastore/dao/mapper/TopicDeployMapper.java | 8 +
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 2 +-
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 72 +-
.../impl/bdbimpl/BdbGroupResCtrlMapperImpl.java | 25 +-
.../impl/bdbimpl/BdbTopicDeployMapperImpl.java | 114 +-
.../server/master/web/action/screen/Webapi.java | 12 +-
.../web/handler/WebAdminGroupCtrlHandler.java | 877 +++++------
.../web/handler/WebBrokerTopicConfHandler.java | 1549 --------------------
.../web/handler/WebGroupConsumeCtrlHandler.java | 31 +-
.../master/web/handler/WebGroupResCtrlHandler.java | 19 +-
.../master/web/handler/WebMasterInfoHandler.java | 5 +
.../master/web/handler/WebTopicCtrlHandler.java | 2 +-
.../master/web/handler/WebTopicDeployHandler.java | 801 +++++-----
25 files changed, 1491 insertions(+), 2801 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 269cd6c..c20a272 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -218,8 +218,8 @@ public enum WebFieldDef {
RegexDef.TMP_IPV4ADDRESS),
ISRESERVEDDATA(77, "isReservedData", "isRsvDt",
WebFieldType.BOOLEAN, "Whether to keep topic data in the broker"),
- WITHCTRLINFO(78, "ctrlData", "cD",
- WebFieldType.BOOLEAN, "With topic control data info."),
+ WITHGROUPAUTHINFO(78, "withGroupAuthInfo", "wGAI",
+ WebFieldType.BOOLEAN, "With topic group authorize info."),
WITHDEPLOYINFO(79, "withDeployInfo", "wDI",
WebFieldType.BOOLEAN, "With topic deploy info."),
@@ -227,7 +227,14 @@ public enum WebFieldDef {
"The topic control info set that needs to be added or modified"),
GROUPRESCTRLSET(81, "groupResCtrlJsonSet", "gResCtrlSet",
WebFieldType.JSONSET,
- "The group resource control info set that needs to be added or modified");
+ "The group resource control info set that needs to be added or modified"),
+ @Deprecated
+ OLDALWDBCRATE(82, "allowedBClientRate", "abcr", WebFieldType.INT,
+ "Allowed broker client rate, same as alwdBrokerClientRate", RegexDef.TMP_NUMBER),
+ @Deprecated
+ GROUPJSONSET(83, "groupNameJsonSet", "gJsonSet", WebFieldType.JSONSET,
+ "The black list group set that needs to be added or modified");
+
public final int id;
public final String name;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStsChgType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStsChgType.java
new file mode 100644
index 0000000..4687fdb
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStsChgType.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.statusdef;
+
+public enum TopicStsChgType {
+ STATUS_CHANGE_SOFT_DELETE(0, "Soft deleted"),
+ STATUS_CHANGE_REMOVE(1, "Soft removed"),
+ STATUS_CHANGE_REDO_SFDEL(2, "Redo soft delete");
+
+ private int code;
+ private String description;
+
+
+ TopicStsChgType(int code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public static TopicStsChgType valueOf(int code) {
+ for (TopicStsChgType changeType : TopicStsChgType.values()) {
+ if (changeType.getCode() == code) {
+ return changeType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("unknown status change code %s", code));
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
index 8eebf61..f0da102 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
@@ -65,7 +65,7 @@ public class ProcessResult {
public void setSuccResult(Object retData) {
this.success = true;
- this.errInfo = "";
+ this.errInfo = "Ok!";
this.errCode = TErrCodeConstants.SUCCESS;
this.retData1 = retData;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index ea3b153..0154bf1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -1026,6 +1026,27 @@ public class WebParameterUtils {
return result.success;
}
+
+ public static boolean isFilterSetFullIncluded(
+ Set<String> qryFilterSet, String confFilterStr) {
+ if (qryFilterSet == null || qryFilterSet.isEmpty()) {
+ return true;
+ }
+ if (confFilterStr == null
+ || (confFilterStr.length() == 2
+ && confFilterStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR))) {
+ return false;
+ }
+ boolean allInc = true;
+ for (String filterCond : qryFilterSet) {
+ if (!confFilterStr.contains(filterCond)) {
+ allInc = false;
+ break;
+ }
+ }
+ return allInc;
+ }
+
/**
* Parse the parameter value from an json dict
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index adee45d..d34af08 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -44,6 +44,7 @@ import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.tubemq.server.common.statusdef.TopicStsChgType;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
@@ -1007,323 +1008,239 @@ public class MetaDataManager implements Server {
// ////////////////////////////////////////////////////////////////////////////
- public List<TopicProcessResult> addTopicDeployInfo(BaseEntity opEntity,
- Set<Integer> brokerIdSet,
- Set<String> topicNameSet,
- TopicPropGroup topicPropInfo,
- StringBuilder sBuilder,
- ProcessResult result) {
- TopicDeployEntity deployConf;
- List<TopicProcessResult> retInfo = new ArrayList<>();
- // add topic control info
- addIfAbsentTopicCtrlConf(topicNameSet,
- opEntity.getCreateUser(), sBuilder, result);
- result.clear();
- // add topic deployment record
- for (Integer brokerId : brokerIdSet) {
- BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
- if (brokerConf == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, "", result));
- continue;
- }
-
- for (String topicName : topicNameSet) {
- TopicDeployEntity deployInfo = getTopicConfInfo(brokerId, topicName);
- if (deployInfo != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- DataOpErrCode.DERR_EXISTED.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- deployConf = new TopicDeployEntity(opEntity, brokerConf.getBrokerId(),
- brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
- deployConf.setTopicProps(brokerConf.getTopicProps());
- deployConf.updModifyInfo(opEntity.getDataVerId(),
- TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_VALUE_UNDEFINED, null,
- TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
- metaStoreService.addTopicConf(deployConf, sBuilder, result);
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- }
+ public TopicProcessResult addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
+ int brokerId, String topicName,
+ TopicStatus deployStatus,
+ TopicPropGroup topicPropInfo,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check broker configure exist
+ BrokerConfEntity brokerConf =
+ getBrokerConfByBrokerId(brokerId);
+ if (brokerConf == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ sBuffer.append("Not found broker configure record by brokerId=")
+ .append(brokerId)
+ .append(", please create the broker's configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(brokerId, "", result);
}
- return retInfo;
+ TopicDeployEntity deployConf =
+ new TopicDeployEntity(opEntity, brokerId, topicName);
+ deployConf.setTopicProps(brokerConf.getTopicProps());
+ deployConf.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED, brokerConf.getBrokerPort(),
+ brokerConf.getBrokerIp(), deployStatus, topicPropInfo);
+ return addOrUpdTopicDeployInfo(isAddOp, deployConf, sBuffer, result);
}
- public TopicProcessResult addTopicDeployInfo(TopicDeployEntity deployEntity,
- StringBuilder sBuilder,
- ProcessResult result) {
+ public TopicProcessResult addOrUpdTopicDeployInfo(boolean isAddOp,
+ TopicDeployEntity deployEntity,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check broker configure exist
BrokerConfEntity brokerConf =
getBrokerConfByBrokerId(deployEntity.getBrokerId());
if (brokerConf == null) {
result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ sBuffer.append("Not found broker configure record by brokerId=")
+ .append(deployEntity.getBrokerId())
+ .append(", please create the broker's configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
return new TopicProcessResult(deployEntity.getBrokerId(), "", result);
}
- // add topic control info
- addIfAbsentTopicCtrlConf(deployEntity, sBuilder, result);
- // add topic deployment record
- TopicDeployEntity curDeployInfo =
- metaStoreService.getTopicConfByeRecKey(deployEntity.getRecordKey());
- if (curDeployInfo != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- DataOpErrCode.DERR_EXISTED.getDescription());
+ // add topic control configure
+ if (!addIfAbsentTopicCtrlConf(deployEntity.getTopicName(),
+ deployEntity.getModifyUser(), sBuffer, result)) {
return new TopicProcessResult(deployEntity.getBrokerId(),
deployEntity.getTopicName(), result);
}
- metaStoreService.addTopicConf(deployEntity, sBuilder, result);
- return new TopicProcessResult(deployEntity.getBrokerId(),
- deployEntity.getTopicName(), result);
- }
-
- /**
- * Modify topic config
- *
- * @param result the process result return
- * @return true if success otherwise false
- */
- public List<TopicProcessResult> modTopicConfig(BaseEntity opEntity,
- Set<Integer> brokerIdSet,
- Set<String> topicNameSet,
- TopicPropGroup topicProps,
- StringBuilder sBuilder,
- ProcessResult result) {
- List<TopicProcessResult> retInfo = new ArrayList<>();
- // add topic control info
- addIfAbsentTopicCtrlConf(topicNameSet,
- opEntity.getModifyUser(), sBuilder, result);
- result.clear();
- // add topic deployment record
- TopicDeployEntity curEntity;
- TopicDeployEntity newEntity;
- for (Integer brokerId : brokerIdSet) {
- BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
- if (brokerConf == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, "", result));
- continue;
- }
- for (String topicName : topicNameSet) {
- curEntity = getTopicConfInfo(brokerId, topicName);
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- if (!curEntity.isValidTopicStatus()) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuilder.append("Topic of ").append(topicName)
- .append("is deleted softly in brokerId=").append(brokerId)
+ // add or update topic deployment record
+ TopicDeployEntity curEntity =
+ metaStoreService.getTopicConfByeRecKey(deployEntity.getRecordKey());
+ if (isAddOp) {
+ if (curEntity == null) {
+ metaStoreService.addTopicConf(deployEntity, sBuffer, result);
+ } else {
+ if (curEntity.isValidTopicStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ sBuffer.append("Duplicate topic deploy configure, exist record is: ")
+ .append("brokerId=").append(curEntity.getBrokerId())
+ .append(", topicName=").append(curEntity.getTopicName())
+ .toString());
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ sBuffer.append("Topic of ").append(curEntity.getTopicName())
+ .append(" is deleted softly in brokerId=")
+ .append(curEntity.getBrokerId())
.append(", please resume the record or hard removed first!")
.toString());
- sBuilder.delete(0, sBuilder.length());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- if (topicProps != null) {
- if (topicProps.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
- && topicProps.getNumPartitions() < curEntity.getNumPartitions()) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
- sBuilder.append("Partition value is less than before,")
- .append("please confirm the configure first! brokerId=")
- .append(curEntity.getBrokerId()).append(", topicName=")
- .append(curEntity.getTopicName())
- .append(", old Partition value is ")
- .append(curEntity.getNumPartitions())
- .append(", new Partition value is ")
- .append(topicProps.getNumPartitions()).toString());
- sBuilder.delete(0, sBuilder.length());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- if (topicProps.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
- && topicProps.getNumTopicStores() < curEntity.getNumTopicStores()) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
- sBuilder.append("TopicStores value is less than before,")
- .append("please confirm the configure first! brokerId=")
- .append(curEntity.getBrokerId()).append(", topicName=")
- .append(curEntity.getTopicName())
- .append(", old TopicStores value is ")
- .append(curEntity.getNumTopicStores())
- .append(", new TopicStores value is ")
- .append(topicProps.getNumTopicStores()).toString());
- sBuilder.delete(0, sBuilder.length());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- }
- newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(opEntity);
- if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
- TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_VALUE_UNDEFINED,
- null, null, topicProps)) {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- sBuilder.append("Data not changed for brokerId=")
- .append(curEntity.getBrokerId()).append(", topicName=")
- .append(curEntity.getTopicName()).toString());
- sBuilder.delete(0, sBuilder.length());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
}
- metaStoreService.updTopicConf(newEntity, sBuilder, result);
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ sBuffer.delete(0, sBuffer.length());
}
- }
- return retInfo;
- }
-
- /**
- * Modify topic config
- *
- * @param result the process result return
- * @return true if success otherwise false
- */
- public List<TopicProcessResult> modDelOrRmvTopicConf(BaseEntity opEntity,
- Set<Integer> brokerIdSet,
- Set<String> topicNameSet,
- TopicStatus topicStatus,
- StringBuilder sBuilder,
- ProcessResult result) {
- TopicDeployEntity curEntity;
- TopicDeployEntity newEntity;
- List<TopicProcessResult> retInfo = new ArrayList<>();
- // add topic deployment record
- for (Integer brokerId : brokerIdSet) {
- BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
- if (brokerConf == null) {
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
+ } else {
+ // update current deployment configure
+ if (curEntity == null) {
result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, "", result));
- continue;
+ sBuffer.append("Not found the topic ").append(curEntity.getTopicName())
+ .append("'s deploy configure in broker=")
+ .append(curEntity.getBrokerId())
+ .append(", please confirm the configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
}
- for (String topicName : topicNameSet) {
- curEntity = getTopicConfInfo(brokerId, topicName);
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- if (curEntity.isAcceptPublish()
- || curEntity.isAcceptSubscribe()) { // still accept publish and subscribe
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuilder.append("The topic ").append(topicName)
- .append("'s acceptPublish and acceptSubscribe status must be false in broker=")
- .append(brokerId).append(" before topic deleted!").toString());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- if ((topicStatus == TopicStatus.STATUS_TOPIC_SOFT_DELETE
- && !curEntity.isValidTopicStatus())
- || (topicStatus == TopicStatus.STATUS_TOPIC_SOFT_REMOVE
- && curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE)) {
- result.setSuccResult("");
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
+ // check deploy status
+ if (!curEntity.isValidTopicStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("Topic of ").append(curEntity.getTopicName())
+ .append("is deleted softly in brokerId=")
+ .append(curEntity.getBrokerId())
+ .append(", please resume the record or hard removed first!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
+ }
+ // check if shrink data store block
+ if (deployEntity.getTopicProps() != null) {
+ if (deployEntity.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
+ && deployEntity.getNumPartitions() < curEntity.getNumPartitions()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuffer.append("Partition value is less than before,")
+ .append("please confirm the configure first! brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName())
+ .append(", old Partition value is ")
+ .append(curEntity.getNumPartitions())
+ .append(", new Partition value is ")
+ .append(deployEntity.getNumPartitions()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
}
- newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(opEntity);
- if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
- TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_VALUE_UNDEFINED,
- null, topicStatus, null)) {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- sBuilder.append("Data not changed for brokerId=")
+ if (deployEntity.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
+ && deployEntity.getNumTopicStores() < curEntity.getNumTopicStores()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuffer.append("TopicStores value is less than before,")
+ .append("please confirm the configure first! brokerId=")
.append(curEntity.getBrokerId()).append(", topicName=")
- .append(curEntity.getTopicName()).toString());
- sBuilder.delete(0, sBuilder.length());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
+ .append(curEntity.getTopicName())
+ .append(", old TopicStores value is ")
+ .append(curEntity.getNumTopicStores())
+ .append(", new TopicStores value is ")
+ .append(deployEntity.getNumTopicStores()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
}
- metaStoreService.updTopicConf(newEntity, sBuilder, result);
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
}
+ TopicDeployEntity newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(deployEntity);
+ if (!newEntity.updModifyInfo(deployEntity.getDataVerId(),
+ deployEntity.getTopicId(), deployEntity.getBrokerPort(),
+ deployEntity.getBrokerIp(), deployEntity.getDeployStatus(),
+ deployEntity.getTopicProps())) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ sBuffer.append("Data not changed for brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ } else {
+ metaStoreService.updTopicConf(newEntity, sBuffer, result);
+ }
+ return new TopicProcessResult(deployEntity.getBrokerId(),
+ deployEntity.getTopicName(), result);
}
- return retInfo;
}
/**
- * Modify topic config
+ * Modify topic deploy status info
*
* @param result the process result return
* @return true if success otherwise false
*/
- public List<TopicProcessResult> modRedoDelTopicConf(BaseEntity opEntity,
- Set<Integer> brokerIdSet,
- Set<String> topicNameSet,
- StringBuilder sBuilder,
- ProcessResult result) {
- TopicDeployEntity curEntity;
- TopicDeployEntity newEntity;
- List<TopicProcessResult> retInfo = new ArrayList<>();
- // add topic deployment record
- for (Integer brokerId : brokerIdSet) {
- BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
- if (brokerConf == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, "", result));
- continue;
+ public TopicProcessResult updTopicDeployStatusInfo(BaseEntity opEntity, int brokerId,
+ String topicName, TopicStsChgType chgType,
+ StringBuilder sBuffer, ProcessResult result) {
+ // get broker configure record
+ BrokerConfEntity brokerConf = getBrokerConfByBrokerId(brokerId);
+ if (brokerConf == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ sBuffer.append("Not found broker configure record by brokerId=")
+ .append(brokerId)
+ .append(", please create the broker's configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(brokerId, topicName, result);
+ }
+ // get topic deploy configure record
+ TopicDeployEntity curEntity = getTopicConfInfo(brokerId, topicName);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ sBuffer.append("Not found the topic ").append(topicName)
+ .append("'s deploy configure in broker=").append(brokerId)
+ .append(", please confirm the configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(brokerId, topicName, result);
+ }
+ // check deploy status if still accept publish and subscribe
+ if (curEntity.isAcceptPublish()
+ || curEntity.isAcceptSubscribe()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("The topic ").append(topicName)
+ .append("'s acceptPublish and acceptSubscribe status must be false in broker=")
+ .append(brokerId).append(" before topic deleted!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return new TopicProcessResult(brokerId, topicName, result);
+ }
+ TopicStatus topicStatus;
+ if (chgType == TopicStsChgType.STATUS_CHANGE_SOFT_DELETE) {
+ if (!curEntity.isValidTopicStatus()) {
+ result.setSuccResult("");
+ return new TopicProcessResult(brokerId, topicName, result);
}
- for (String topicName : topicNameSet) {
- curEntity = getTopicConfInfo(brokerId, topicName);
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- if (curEntity.isAcceptPublish()
- || curEntity.isAcceptSubscribe()) { // still accept publish and subscribe
+ topicStatus = TopicStatus.STATUS_TOPIC_SOFT_DELETE;
+ } else if (chgType == TopicStsChgType.STATUS_CHANGE_REMOVE) {
+ if (curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE) {
+ result.setSuccResult("");
+ return new TopicProcessResult(brokerId, topicName, result);
+ }
+ topicStatus = TopicStatus.STATUS_TOPIC_SOFT_REMOVE;
+ } else {
+ if (curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE) {
+ if (curEntity.isValidTopicStatus()) {
+ result.setSuccResult(null);
+ } else {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuilder.append("The topic ").append(topicName)
- .append("'s acceptPublish and acceptSubscribe status must be false in broker=")
- .append(brokerId).append(" before topic deleted!").toString());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- if (curEntity.getTopicStatus() != TopicStatus.STATUS_TOPIC_SOFT_DELETE) {
- if (curEntity.isValidTopicStatus()) {
- result.setSuccResult(null);
- } else {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuilder.append("Topic of ").append(topicName)
- .append("is in removing flow in brokerId=")
- .append(curEntity.getBrokerId())
- .append(", please wait until remove process finished!")
- .toString());
- sBuilder.delete(0, sBuilder.length());
- }
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
- }
- newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(opEntity);
- if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
- TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_VALUE_UNDEFINED, null,
- TopicStatus.STATUS_TOPIC_OK, null)) {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- sBuilder.append("Data not changed for brokerId=")
- .append(curEntity.getBrokerId()).append(", topicName=")
- .append(curEntity.getTopicName()).toString());
- sBuilder.delete(0, sBuilder.length());
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
- continue;
+ sBuffer.append("Topic of ").append(topicName)
+ .append("is in removing flow in brokerId=")
+ .append(curEntity.getBrokerId())
+ .append(", please wait until remove process finished!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
}
- metaStoreService.updTopicConf(newEntity, sBuilder, result);
- retInfo.add(new TopicProcessResult(brokerId, topicName, result));
+ return new TopicProcessResult(brokerId, topicName, result);
}
+ topicStatus = TopicStatus.STATUS_TOPIC_OK;
}
- return retInfo;
+ TopicDeployEntity newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(opEntity);
+ if (newEntity.updModifyInfo(opEntity.getDataVerId(),
+ curEntity.getTopicId(), brokerConf.getBrokerPort(),
+ brokerConf.getBrokerIp(), topicStatus, null)) {
+ metaStoreService.updTopicConf(newEntity, sBuffer, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ sBuffer.append("Data not changed for brokerId=")
+ .append(curEntity.getBrokerId()).append(", topicName=")
+ .append(curEntity.getTopicName()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ return new TopicProcessResult(brokerId, topicName, result);
}
-
/**
* Get broker topic entity, if query entity is null, return all topic entity
*
@@ -1603,10 +1520,10 @@ public class MetaDataManager implements Server {
* @param topicNameSet the topic name will be add
* @param operator the topic name id will be add
* @param operator operator
- * @param strBuffer the print info string buffer
+ * @param sBuffer the print info string buffer
*/
- public void addIfAbsentTopicCtrlConf(Set<String> topicNameSet, String operator,
- StringBuilder strBuffer, ProcessResult result) {
+ public boolean addIfAbsentTopicCtrlConf(Set<String> topicNameSet, String operator,
+ StringBuilder sBuffer, ProcessResult result) {
TopicCtrlEntity curEntity;
int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
@@ -1620,10 +1537,40 @@ public class MetaDataManager implements Server {
}
curEntity = new TopicCtrlEntity(topicName,
TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB, operator);
- metaStoreService.addTopicCtrlConf(curEntity, strBuffer, result);
+ if (!metaStoreService.addTopicCtrlConf(curEntity, sBuffer, result)) {
+ return result.isSuccess();
+ }
}
+ result.setSuccResult(null);
+ return result.isSuccess();
}
+ /**
+ * Add if absent topic control configure info
+ *
+ * @param topicName the topic name will be add
+ * @param operator the topic name id will be add
+ * @param operator operator
+ * @param sBuffer the print info string buffer
+ */
+ public boolean addIfAbsentTopicCtrlConf(String topicName, String operator,
+ StringBuilder sBuffer, ProcessResult result) {
+ int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+ ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
+ if (defSetting != null) {
+ maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+ }
+ TopicCtrlEntity curEntity =
+ metaStoreService.getTopicCtrlConf(topicName);
+ if (curEntity == null) {
+ curEntity = new TopicCtrlEntity(topicName,
+ TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB, operator);
+ metaStoreService.addTopicCtrlConf(curEntity, sBuffer, result);
+ } else {
+ result.setSuccResult(null);
+ }
+ return result.isSuccess();
+ }
public TopicCtrlEntity getTopicCtrlByTopicName(String topicName) {
return this.metaStoreService.getTopicCtrlConf(topicName);
@@ -1775,6 +1722,63 @@ public class MetaDataManager implements Server {
}
/**
+ * Operate group consume control configure info
+ *
+ * @param opEntity the group resource control info entity will be add
+ * @param groupName operate target
+ * @param sBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public GroupProcessResult enOrDisConsumeCtrlConf(BaseEntity opEntity, String groupName,
+ boolean enableConsume, String disReason,
+ StringBuilder sBuffer, ProcessResult result) {
+ GroupResCtrlEntity newEntity = new GroupResCtrlEntity(opEntity, groupName);
+ newEntity.updModifyInfo(opEntity.getDataVerId(), enableConsume, disReason,
+ null, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TBaseConstants.META_VALUE_UNDEFINED, null);
+ return addOrUpdGroupResCtrlConf(newEntity, sBuffer, result);
+ }
+
+ /**
+ * add or update if present configure info
+ *
+ * @param entity the group resource control info entity will be add
+ * @param sBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public GroupProcessResult addOrUpdGroupResCtrlConf(GroupResCtrlEntity entity,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ GroupResCtrlEntity newEntity;
+ GroupResCtrlEntity curEntity =
+ metaStoreService.getGroupResCtrlConf(entity.getGroupName());
+ if (curEntity == null) {
+ newEntity = new GroupResCtrlEntity(entity, entity.getGroupName());
+ newEntity.fillDefaultValue();
+ newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableConsume(),
+ entity.getDisableReason(), entity.isEnableResCheck(),
+ entity.getAllowedBrokerClientRate(), entity.getQryPriorityId(),
+ entity.isFlowCtrlEnable(), entity.getRuleCnt(), entity.getFlowCtrlInfo());
+ metaStoreService.addGroupResCtrlConf(newEntity, sBuffer, result);
+ } else {
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableConsume(),
+ entity.getDisableReason(), entity.isEnableResCheck(),
+ entity.getAllowedBrokerClientRate(), entity.getQryPriorityId(),
+ entity.isFlowCtrlEnable(), entity.getRuleCnt(), entity.getFlowCtrlInfo())) {
+ metaStoreService.updGroupResCtrlConf(newEntity, sBuffer, result);
+ } else {
+ result.setSuccResult(null);
+ }
+ }
+ return new GroupProcessResult(entity.getGroupName(), null, result);
+ }
+
+ /**
* Delete group resource control configure
*
* @param operator operator
@@ -1988,13 +1992,13 @@ public class MetaDataManager implements Server {
/**
* Get group consume control configure for topic & group set
*
- * @param groupNameSet the topic name
- * @param topicNameSet the group name
+ * @param groupSet the topic name
+ * @param topicSet the group name
* @return group consume control record
*/
public Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
- Set<String> groupNameSet, Set<String> topicNameSet) {
- return metaStoreService.getConsumeCtrlByGroupAndTopic(groupNameSet, topicNameSet);
+ Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
+ return metaStoreService.getConsumeCtrlInfoMap(groupSet, topicSet, qryEntry);
}
// //////////////////////////////////////////////////////////////////////////////
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 3018667..17d0b43 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -912,9 +912,9 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
- public Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupAndTopic(
- Set<String> groupNameSet, Set<String> topicNameSet) {
- return groupConsumeCtrlMapper.getConsumeCtrlByGroupAndTopic(groupNameSet, topicNameSet);
+ public Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
+ Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
+ return groupConsumeCtrlMapper.getConsumeCtrlInfoMap(groupSet, topicSet, qryEntry);
}
@Override
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 6524471..2312058 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -326,8 +326,8 @@ public interface MetaStoreService extends KeepAlive, Server {
GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
- Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupAndTopic(
- Set<String> groupNameSet, Set<String> topicNameSet);
+ Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
+ Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry);
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index 5645174..65bbaeb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -34,12 +34,13 @@ public class BaseEntity implements Serializable, Cloneable {
private long dataVersionId =
TServerConstants.DEFAULT_DATA_VERSION; // 0: default version, other: version
private long serialId = TBaseConstants.META_VALUE_UNDEFINED;
- private String createUser = ""; //create user
- private Date createDate = null; //create date
- private String modifyUser = ""; //modify user
- private Date modifyDate = null; //modify date
- private String attributes = ""; //attribute info
-
+ private String createUser = ""; // create user
+ private Date createDate = null; // create date
+ private String modifyUser = ""; // modify user
+ private Date modifyDate = null; // modify date
+ private String attributes = ""; // attribute info
+ private String createDateStr = ""; // create data string
+ private String modifyDateStr = ""; // create data string
public BaseEntity() {
@@ -57,9 +58,9 @@ public class BaseEntity implements Serializable, Cloneable {
public BaseEntity(BaseEntity other) {
this.dataVersionId = other.dataVersionId;
this.createUser = other.createUser;
- this.createDate = other.createDate;
+ this.setCreateDate(other.createDate);
this.modifyUser = other.modifyUser;
- this.modifyDate = other.modifyDate;
+ this.setModifyDate(other.modifyDate);
}
public BaseEntity(long dataVersionId, String createUser, Date createDate) {
@@ -77,15 +78,9 @@ public class BaseEntity implements Serializable, Cloneable {
String modifyUser, Date modifyDate) {
this.dataVersionId = dataVersionId;
this.createUser = createUser;
- this.createDate = createDate;
+ this.setCreateDate(createDate);
this.modifyUser = modifyUser;
- this.modifyDate = modifyDate;
- }
-
- public void setModifyInfo(long newDataVerId, String operator, Date opDate) {
- this.dataVersionId = newDataVerId;
- this.modifyUser = operator;
- this.modifyDate = opDate;
+ this.setModifyDate(modifyDate);
}
public boolean updBaseModifyInfo(BaseEntity opInfoEntity) {
@@ -98,7 +93,7 @@ public class BaseEntity implements Serializable, Cloneable {
if (opInfoEntity.getCreateDate() != null
&& !Objects.equals(createDate, opInfoEntity.getCreateDate())) {
changed = true;
- this.createDate = opInfoEntity.getCreateDate();
+ this.setCreateDate(opInfoEntity.getCreateDate());
}
if (TStringUtils.isNotBlank(opInfoEntity.getModifyUser())
&& !Objects.equals(modifyUser, opInfoEntity.getModifyUser())) {
@@ -108,7 +103,7 @@ public class BaseEntity implements Serializable, Cloneable {
if (opInfoEntity.getModifyDate() != null
&& !Objects.equals(modifyDate, opInfoEntity.getModifyDate())) {
changed = true;
- this.modifyDate = opInfoEntity.getModifyDate();
+ this.setModifyDate(opInfoEntity.getModifyDate());
}
if (TStringUtils.isNotBlank(opInfoEntity.getAttributes())
&& !Objects.equals(attributes, opInfoEntity.getAttributes())) {
@@ -137,7 +132,7 @@ public class BaseEntity implements Serializable, Cloneable {
if (newCreateDate != null
&& !Objects.equals(createDate, newCreateDate)) {
changed = true;
- this.createDate = newCreateDate;
+ this.setCreateDate(newCreateDate);
}
if (TStringUtils.isNotBlank(newModifyUser)
&& !Objects.equals(modifyUser, newModifyUser)) {
@@ -147,7 +142,7 @@ public class BaseEntity implements Serializable, Cloneable {
if (newModifyDate != null
&& !Objects.equals(modifyDate, newModifyDate)) {
changed = true;
- this.modifyDate = newModifyDate;
+ this.setModifyDate(newModifyDate);
}
if (TStringUtils.isNotBlank(newAttributes)
&& !Objects.equals(attributes, newAttributes)) {
@@ -206,6 +201,14 @@ public class BaseEntity implements Serializable, Cloneable {
return modifyDate;
}
+ public String getCreateDateStr() {
+ return createDateStr;
+ }
+
+ public String getModifyDateStr() {
+ return modifyDateStr;
+ }
+
public String toJsonString(Gson gson) {
return gson.toJson(this);
}
@@ -247,26 +250,33 @@ public class BaseEntity implements Serializable, Cloneable {
StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
if (isLongName) {
sBuilder.append(",\"dataVersionId\":").append(dataVersionId)
+ .append(",\"serialId\":").append(serialId)
.append(",\"createUser\":\"").append(createUser).append("\"")
- .append(",\"createDate\":\"")
- .append(WebParameterUtils.date2yyyyMMddHHmmss(createDate)).append("\"")
+ .append(",\"createDate\":\"").append(createDateStr).append("\"")
.append(",\"modifyUser\":\"").append(modifyUser).append("\"")
- .append(",\"modifyDate\":\"")
- .append(WebParameterUtils.date2yyyyMMddHHmmss(modifyDate)).append("\"")
+ .append(",\"modifyDate\":\"").append(modifyDateStr).append("\"")
.append(",\"attributes\":\"").append(attributes).append("\"");
} else {
sBuilder.append(",\"dVerId\":").append(dataVersionId)
.append(",\"cur\":\"").append(createUser).append("\"")
- .append(",\"cDate\":\"")
- .append(WebParameterUtils.date2yyyyMMddHHmmss(createDate)).append("\"")
+ .append(",\"cDate\":\"").append(createDateStr).append("\"")
.append(",\"mur\":\"").append(modifyUser).append("\"")
- .append(",\"mDate\":\"")
- .append(WebParameterUtils.date2yyyyMMddHHmmss(modifyDate)).append("\"")
+ .append(",\"mDate\":\"").append(modifyDateStr).append("\"")
.append(",\"attrs\":\"").append(attributes).append("\"");
}
return sBuilder;
}
+ private void setModifyDate(Date date) {
+ this.modifyDate = date;
+ this.modifyDateStr = WebParameterUtils.date2yyyyMMddHHmmss(date);
+ }
+
+ private void setCreateDate(Date date) {
+ this.createDate = date;
+ this.createDateStr = WebParameterUtils.date2yyyyMMddHHmmss(date);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index 13d8c47..a922c15 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -220,7 +220,7 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
this.flowCtrlInfo = flowCtrlInfo;
}
- private void setConsumeEnable(boolean enableConsume) {
+ public void setConsumeEnable(boolean enableConsume) {
if (enableConsume) {
this.consumeEnable = EnableStatus.STATUS_ENABLE;
} else {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 8b3fd99..7d879d2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -35,28 +35,23 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
private String recordKey = "";
private String topicName = "";
private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
+ private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_UNDEFINED; // topic status
+ private TopicPropGroup topicProps = new TopicPropGroup();
private String brokerIp = "";
private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
private String brokerAddress = "";
- // topic id, require globally unique
private int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
- private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_UNDEFINED; // topic status
- private TopicPropGroup topicProps = new TopicPropGroup();
public TopicDeployEntity() {
super();
}
- public TopicDeployEntity(BaseEntity opInfoEntity, int brokerId,
- String brokerIp, int brokerPort, String topicName) {
+ public TopicDeployEntity(BaseEntity opInfoEntity, int brokerId, String topicName) {
super(opInfoEntity);
this.brokerId = brokerId;
- this.brokerIp = brokerIp;
- this.brokerPort = brokerPort;
this.topicName = topicName;
this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
- this.brokerAddress = KeyBuilderUtils.buildAddressInfo(brokerIp, brokerPort);
}
public TopicDeployEntity(String topicName, int topicId, int brokerId,
@@ -69,6 +64,8 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
this.topicNameId = topicId;
this.deployStatus = deployStatus;
this.topicProps = topicProps;
+ this.brokerAddress = KeyBuilderUtils.buildAddressInfo(brokerIp, brokerPort);
+
}
public TopicDeployEntity(BdbTopicConfEntity bdbEntity) {
@@ -110,8 +107,8 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
return bdbEntity;
}
- public void setTopicDeployInfo(int brokerId, String brokerIp,
- int brokerPort, String topicName) {
+ private void setTopicDeployInfo(int brokerId, String brokerIp,
+ int brokerPort, String topicName) {
this.brokerId = brokerId;
this.brokerIp = brokerIp;
this.brokerPort = brokerPort;
@@ -281,7 +278,7 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
&& !target.getTopicName().equals(this.topicName))
|| (TStringUtils.isNotBlank(target.getBrokerIp())
&& !target.getBrokerIp().equals(this.brokerIp))
- || !target.getTopicProps().isMatched(topicProps)
+ || !topicProps.isMatched(target.topicProps)
|| (target.getTopicStatus() != TopicStatus.STATUS_TOPIC_UNDEFINED
&& target.getTopicStatus() != this.deployStatus)) {
return false;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
index e94fdfd..713ad26 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
@@ -52,8 +52,8 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
- Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupAndTopic(
- Set<String> groupNameSet, Set<String> topicNameSet);
+ Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
+ Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry);
List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity qryEntity);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java
index 387e536..7775bb4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java
@@ -35,6 +35,13 @@ public interface GroupResCtrlMapper extends AbstractMapper {
GroupResCtrlEntity getGroupResCtrlConf(String groupName);
- Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupSet,
+ /**
+ * Get group resource control entity
+ *
+ * @param groupNameSet need query group name set
+ * @param qryEntity must not null
+ * @return group resource control info by groupName's key
+ */
+ Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupNameSet,
GroupResCtrlEntity qryEntity);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 5e28b05..45300fe 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -47,6 +47,14 @@ public interface TopicDeployMapper extends AbstractMapper {
TopicDeployEntity getTopicConfByeRecKey(String recordKey);
+ /**
+ * Get broker topic entity, if query entity is null, return all topic entity
+ *
+ * @param topicNameSet need query topic name set
+ * @param brokerIdSet need query broker id set
+ * @param qryEntity must not null
+ * @return topic deploy info by topicName's key
+ */
Map<String, List<TopicDeployEntity>> getTopicConfMap(Set<String> topicNameSet,
Set<Integer> brokerIdSet,
TopicDeployEntity qryEntity);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index d2e9166..e136df6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -232,7 +232,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
for (Integer brokerId : qryBrokerKey) {
BrokerConfEntity entity = brokerConfCache.get(brokerId);
if (entity == null
- || (qryEntity != null && !qryEntity.isMatched(entity))) {
+ || (qryEntity != null && !entity.isMatched(qryEntity))) {
continue;
}
retMap.put(entity.getBrokerId(), entity);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 8218753..171f970 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -280,53 +280,53 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
}
@Override
- public Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlByGroupAndTopic(
- Set<String> groupNameSet, Set<String> topicNameSet) {
- GroupConsumeCtrlEntity tmpEntity;
- List<GroupConsumeCtrlEntity> itemLst;
+ public Map<String/* group */, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(
+ Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
ConcurrentHashSet<String> recSet;
- Set<String> hitKeys = new HashSet<>();
+ Set<String> qryHitRecSet = null;
Map<String, List<GroupConsumeCtrlEntity>> retEntityMap = new HashMap<>();
- if (((groupNameSet == null) || (groupNameSet.isEmpty()))
- && ((topicNameSet == null) || (topicNameSet.isEmpty()))) {
- for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
- itemLst = retEntityMap.get(entity.getGroupName());
- if (itemLst == null) {
- itemLst = new ArrayList<>();
- retEntityMap.put(entity.getTopicName(), itemLst);
+ // filter group items
+ if (groupSet != null && !groupSet.isEmpty()) {
+ qryHitRecSet = new HashSet<>();
+ for (String group : groupSet) {
+ recSet = grpConsumeCtrlGroupCache.get(group);
+ if (recSet != null && !recSet.isEmpty()) {
+ qryHitRecSet.addAll(recSet);
}
- itemLst.add(entity);
}
- return retEntityMap;
}
- if ((groupNameSet == null) || (groupNameSet.isEmpty())) {
- for (String topicName : topicNameSet) {
- recSet = grpConsumeCtrlTopicCache.get(topicName);
- if (recSet == null || recSet.isEmpty()) {
- continue;
- }
- hitKeys.addAll(recSet);
+ // filter topic items
+ if (topicSet != null && !topicSet.isEmpty()) {
+ if (qryHitRecSet == null) {
+ qryHitRecSet = new HashSet<>();
}
- } else {
- for (String groupName : groupNameSet) {
- recSet = grpConsumeCtrlGroupCache.get(groupName);
- if (recSet == null || recSet.isEmpty()) {
- continue;
+ for (String topic : topicSet) {
+ recSet = grpConsumeCtrlTopicCache.get(topic);
+ if (recSet != null && !recSet.isEmpty()) {
+ qryHitRecSet.addAll(recSet);
}
- hitKeys.addAll(recSet);
}
}
- for (String key : hitKeys) {
- tmpEntity = grpConsumeCtrlCache.get(key);
- if (tmpEntity == null) {
- continue;
+ // get matched records
+ GroupConsumeCtrlEntity tmpEntity;
+ List<GroupConsumeCtrlEntity> itemLst;
+ if (qryHitRecSet == null) {
+ for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
+ if (entity != null && entity.isMatched(qryEntry)) {
+ itemLst = retEntityMap.computeIfAbsent(
+ entity.getGroupName(), k -> new ArrayList<>());
+ itemLst.add(entity);
+ }
}
- itemLst = retEntityMap.get(tmpEntity.getTopicName());
- if (itemLst == null) {
- itemLst = new ArrayList<>();
- retEntityMap.put(tmpEntity.getTopicName(), itemLst);
+ } else {
+ for (String recKey : qryHitRecSet) {
+ tmpEntity = grpConsumeCtrlCache.get(recKey);
+ if (tmpEntity != null && tmpEntity.isMatched(qryEntry)) {
+ itemLst = retEntityMap.computeIfAbsent(
+ tmpEntity.getGroupName(), k -> new ArrayList<>());
+ itemLst.add(tmpEntity);
+ }
}
- itemLst.add(tmpEntity);
}
return retEntityMap;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
index 327c3c3..6c61370 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
@@ -24,7 +24,6 @@ import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -165,23 +164,23 @@ public class BdbGroupResCtrlMapperImpl implements GroupResCtrlMapper {
}
@Override
- public Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupSet,
+ public Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupNameSet,
GroupResCtrlEntity qryEntity) {
GroupResCtrlEntity entity;
- Set<String> qryKeySet = new HashSet<>();
Map<String, GroupResCtrlEntity> retMap = new HashMap<>();
- if (groupSet == null || groupSet.isEmpty()) {
- qryKeySet.addAll(groupBaseCtrlCache.keySet());
+ if (groupNameSet == null || groupNameSet.isEmpty()) {
+ for (GroupResCtrlEntity dataEntity : groupBaseCtrlCache.values()) {
+ if (dataEntity != null && dataEntity.isMatched(qryEntity)) {
+ retMap.put(dataEntity.getGroupName(), dataEntity);
+ }
+ }
} else {
- qryKeySet.addAll(groupSet);
- }
- for (String group : qryKeySet) {
- entity = groupBaseCtrlCache.get(group);
- if (entity == null
- || (qryEntity != null && !qryEntity.isMatched(qryEntity))) {
- continue;
+ for (String groupName : groupNameSet) {
+ entity = groupBaseCtrlCache.get(groupName);
+ if (entity != null && entity.isMatched(qryEntity)) {
+ retMap.put(entity.getGroupName(), entity);
+ }
}
- retMap.put(entity.getGroupName(), entity);
}
return retMap;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index c38ea16..22002ce 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -236,6 +236,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
Set<String> qryTopicKey = null;
ConcurrentHashSet<String> keySet;
Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
+ // get deploy records set by topicName
if (topicNameSet != null && !topicNameSet.isEmpty()) {
qryTopicKey = new HashSet<>();
for (String topicName : topicNameSet) {
@@ -245,6 +246,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
}
}
}
+ // get deploy records set by brokerId
if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
if (qryTopicKey == null) {
qryTopicKey = new HashSet<>();
@@ -256,24 +258,24 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
}
}
}
+ // filter record by qryEntity
if (qryTopicKey == null) {
- qryTopicKey = new HashSet<>(topicConfCache.keySet());
- }
- if (qryTopicKey.isEmpty()) {
- return retEntityMap;
- }
- for (String recordKey: qryTopicKey) {
- TopicDeployEntity entity = topicConfCache.get(recordKey);
- if (entity == null
- || (qryEntity != null && !qryEntity.isMatched(entity))) {
- continue;
+ for (TopicDeployEntity deployEntity : topicConfCache.values()) {
+ if (deployEntity != null && deployEntity.isMatched(qryEntity)) {
+ items = retEntityMap.computeIfAbsent(
+ deployEntity.getTopicName(), k -> new ArrayList<>());
+ items.add(deployEntity);
+ }
}
- items = retEntityMap.get(entity.getTopicName());
- if (items == null) {
- items = new ArrayList<>();
- retEntityMap.put(entity.getTopicName(), items);
+ } else {
+ for (String recKey : qryTopicKey) {
+ TopicDeployEntity deployEntity = topicConfCache.get(recKey);
+ if (deployEntity != null && deployEntity.isMatched(qryEntity)) {
+ items = retEntityMap.computeIfAbsent(
+ deployEntity.getTopicName(), k -> new ArrayList<>());
+ items.add(deployEntity);
+ }
}
- items.add(entity);
}
return retEntityMap;
}
@@ -380,31 +382,32 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
@Override
public Map<Integer, Set<String>> getConfiguredTopicInfo(Set<Integer> brokerIdSet) {
- Set<String> items;
+ Set<String> topicSet;
+ ConcurrentHashSet<String> deploySet;
Map<Integer, Set<String>> retEntityMap = new HashMap<>();
if (brokerIdSet == null || brokerIdSet.isEmpty()) {
- brokerIdSet = new HashSet<>();
- brokerIdSet.addAll(brokerId2TopicCacheIndex.keySet());
- }
- for (Integer brokerId : brokerIdSet) {
- if (brokerId == null) {
- continue;
- }
- ConcurrentHashSet<String> topicSet =
- brokerId2TopicCacheIndex.get(brokerId);
- if (topicSet == null || topicSet.isEmpty()) {
- continue;
- }
- items = retEntityMap.get(brokerId);
- if (items == null) {
- items = new HashSet<>();
- retEntityMap.put(brokerId, items);
+ for (Map.Entry<Integer, ConcurrentHashSet<String>> entry
+ : brokerId2TopicCacheIndex.entrySet()) {
+ if (entry.getKey() == null) {
+ continue;
+ }
+ topicSet = new HashSet<>();
+ if (entry.getValue() != null) {
+ topicSet.addAll(entry.getValue());
+ }
+ retEntityMap.put(entry.getKey(), topicSet);
}
- for (String topic : topicSet) {
- if (topic == null) {
+ } else {
+ for (Integer brokerId : brokerIdSet) {
+ if (brokerId == null) {
continue;
}
- items.add(topic);
+ topicSet = new HashSet<>();
+ deploySet = brokerId2TopicCacheIndex.get(brokerId);
+ if (deploySet != null) {
+ topicSet.addAll(deploySet);
+ }
+ retEntityMap.put(brokerId, topicSet);
}
}
return retEntityMap;
@@ -412,32 +415,33 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
@Override
public Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet) {
- Map<Integer, String> items;
+ ConcurrentHashSet<String> keySet;
+ Map<Integer, String> brokerInfoMap;
Map<String, Map<Integer, String>> retEntityMap = new HashMap<>();
if (topicNameSet == null || topicNameSet.isEmpty()) {
- topicNameSet = new HashSet<>();
- topicNameSet.addAll(topicNameCacheIndex.keySet());
- }
- for (String topicName : topicNameSet) {
- if (topicName == null) {
- continue;
- }
- ConcurrentHashSet<String> keySet =
- topicNameCacheIndex.get(topicName);
- if (keySet == null || keySet.isEmpty()) {
- continue;
+ for (TopicDeployEntity entry : topicConfCache.values()) {
+ if (entry == null) {
+ continue;
+ }
+ brokerInfoMap = retEntityMap.computeIfAbsent(
+ entry.getTopicName(), k -> new HashMap<>());
+ brokerInfoMap.put(entry.getBrokerId(), entry.getBrokerIp());
}
- for (String key : keySet) {
- TopicDeployEntity entity = topicConfCache.get(key);
- if (entity == null) {
+ } else {
+ for (String topicName : topicNameSet) {
+ if (topicName == null) {
continue;
}
- items = retEntityMap.get(topicName);
- if (items == null) {
- items = new HashMap<>();
- retEntityMap.put(topicName, items);
+ brokerInfoMap = retEntityMap.computeIfAbsent(topicName, k -> new HashMap<>());
+ keySet = topicNameCacheIndex.get(topicName);
+ if (keySet != null) {
+ for (String key : keySet) {
+ TopicDeployEntity entry = topicConfCache.get(key);
+ if (entry != null) {
+ brokerInfoMap.put(entry.getBrokerId(), entry.getBrokerIp());
+ }
+ }
}
- items.put(entity.getBrokerId(), entity.getBrokerIp());
}
}
return retEntityMap;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
index bdc7633..0422524 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
@@ -32,10 +32,12 @@ import org.apache.tubemq.server.master.web.handler.AbstractWebHandler;
import org.apache.tubemq.server.master.web.handler.WebAdminGroupCtrlHandler;
import org.apache.tubemq.server.master.web.handler.WebAdminTopicAuthHandler;
import org.apache.tubemq.server.master.web.handler.WebBrokerDefConfHandler;
-import org.apache.tubemq.server.master.web.handler.WebBrokerTopicConfHandler;
+import org.apache.tubemq.server.master.web.handler.WebGroupConsumeCtrlHandler;
import org.apache.tubemq.server.master.web.handler.WebGroupResCtrlHandler;
import org.apache.tubemq.server.master.web.handler.WebMasterInfoHandler;
import org.apache.tubemq.server.master.web.handler.WebOtherInfoHandler;
+import org.apache.tubemq.server.master.web.handler.WebTopicCtrlHandler;
+import org.apache.tubemq.server.master.web.handler.WebTopicDeployHandler;
import org.apache.tubemq.server.master.web.simplemvc.Action;
import org.apache.tubemq.server.master.web.simplemvc.RequestContext;
@@ -56,12 +58,14 @@ public class Webapi implements Action {
public Webapi(TMaster master) {
this.master = master;
registerHandler(new WebBrokerDefConfHandler(this.master));
- registerHandler(new WebBrokerTopicConfHandler(this.master));
- registerHandler(new WebAdminGroupCtrlHandler(this.master));
- registerHandler(new WebAdminTopicAuthHandler(this.master));
+ registerHandler(new WebTopicCtrlHandler(this.master));
+ registerHandler(new WebTopicDeployHandler(this.master));
+ registerHandler(new WebGroupConsumeCtrlHandler(this.master));
registerHandler(new WebGroupResCtrlHandler(this.master));
registerHandler(new WebMasterInfoHandler(this.master));
registerHandler(new WebOtherInfoHandler(this.master));
+ registerHandler(new WebAdminGroupCtrlHandler(this.master));
+ registerHandler(new WebAdminTopicAuthHandler(this.master));
}
@Override
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 58e47bd..f587849 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -17,7 +17,6 @@
package org.apache.tubemq.server.master.web.handler;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -30,13 +29,17 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
@@ -101,6 +104,363 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
/**
+ * Query black consumer group info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryBlackGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // build query entity
+ GroupResCtrlEntity entity = new GroupResCtrlEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, entity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+ // only query disable consume group
+ entity.setConsumeEnable(false);
+ Map<String, GroupResCtrlEntity> qryResult =
+ metaDataManager.confGetGroupResCtrlConf(groupNameSet, entity);
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (GroupResCtrlEntity entry : qryResult.values()) {
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
+ .append(",\"reason\":\"").append(entry.getDisableReason()).append("\"")
+ .append(",\"dataVersionId\":").append(entry.getDataVerId())
+ .append(",\"createUser\":\"").append(entry.getCreateUser()).append("\"")
+ .append(",\"createDate\":\"").append(entry.getCreateDateStr()).append("\"")
+ .append(",\"modifyUser\":\"").append(entry.getModifyUser()).append("\"")
+ .append(",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ /**
+ * Query allowed(authorized?) consumer group info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryConsumerGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // build query entity
+ GroupConsumeCtrlEntity qryEntity = new GroupConsumeCtrlEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ Map<String, List<GroupConsumeCtrlEntity>> qryResultMap =
+ metaDataManager.getGroupConsumeCtrlConf(groupNameSet, topicNameSet, qryEntity);
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (List<GroupConsumeCtrlEntity> entryLst : qryResultMap.values()) {
+ if (entryLst == null || entryLst.isEmpty()) {
+ continue;
+ }
+ for (GroupConsumeCtrlEntity entry : entryLst) {
+ if (entry == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"topicName\":\"").append(entry.getTopicName())
+ .append("\",\"groupName\":\"").append(entry.getGroupName())
+ .append(",\"dataVersionId\":").append(entry.getDataVerId())
+ .append(",\"createUser\":\"").append(entry.getCreateUser()).append("\"")
+ .append(",\"createDate\":\"").append(entry.getCreateDateStr()).append("\"")
+ .append(",\"modifyUser\":\"").append(entry.getModifyUser()).append("\"")
+ .append(",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ }
+ return sBuffer;
+ }
+
+ /**
+ * Query group filter condition info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryGroupFilterCondInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // build query entity
+ GroupConsumeCtrlEntity qryEntity = new GroupConsumeCtrlEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // check and get condStatus field
+ if (!getCondStatusParamValue(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Boolean filterEnable = (Boolean) result.getRetData();
+ // get filterConds info
+ if (!WebParameterUtils.getFilterCondSet(req, false, true, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> filterCondSet = (Set<String>) result.retData1;
+ qryEntity.updModifyInfo(qryEntity.getDataVerId(),
+ null, null, filterEnable, null);
+ Map<String, List<GroupConsumeCtrlEntity>> qryResultMap =
+ metaDataManager.getGroupConsumeCtrlConf(groupNameSet, topicNameSet, qryEntity);
+ // build return result
+ int totalCnt = 0;
+ int condStatusId = 0;
+ String itemFilterStr;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (List<GroupConsumeCtrlEntity> consumeCtrlEntityList : qryResultMap.values()) {
+ if (consumeCtrlEntityList == null || consumeCtrlEntityList.isEmpty()) {
+ continue;
+ }
+ for (GroupConsumeCtrlEntity entry : consumeCtrlEntityList) {
+ if (entry == null
+ || !WebParameterUtils.isFilterSetFullIncluded(
+ filterCondSet, entry.getFilterCondStr())) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ condStatusId = entry.getFilterEnable().isEnable() ? 2 : 0;
+ itemFilterStr = (entry.getFilterCondStr().length() <= 2)
+ ? "" : entry.getFilterCondStr();
+ sBuffer.append("{\"topicName\":\"").append(entry.getTopicName())
+ .append("\",\"groupName\":\"").append(entry.getGroupName())
+ .append("\",\"condStatus\":").append(condStatusId)
+ .append(",\"filterConds\":\"").append(itemFilterStr)
+ .append("\",\"dataVersionId\":").append(entry.getDataVerId())
+ .append(",\"createUser\":\"").append(entry.getCreateUser())
+ .append("\",\"createDate\":\"").append(entry.getCreateDateStr())
+ .append("\",\"modifyUser\":\"").append(entry.getModifyUser())
+ .append("\",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
+ }
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ /**
+ * Query consumer group setting
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryConsumeGroupSetting(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // build query entity
+ GroupResCtrlEntity entity = new GroupResCtrlEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, entity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+ // get group list
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.OLDALWDBCRATE, false,
+ TBaseConstants.META_VALUE_UNDEFINED, 0, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int allowedBClientRate = (int) result.getRetData();
+ // query matched records
+ entity.updModifyInfo(entity.getDataVerId(), null, null,
+ null, allowedBClientRate, TBaseConstants.META_VALUE_UNDEFINED,
+ null, TBaseConstants.META_VALUE_UNDEFINED, null);
+ Map<String, GroupResCtrlEntity> groupResCtrlEntityMap =
+ metaDataManager.confGetGroupResCtrlConf(groupNameSet, entity);
+ // build return result
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (GroupResCtrlEntity entry : groupResCtrlEntityMap.values()) {
+ if (entry == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"groupName\":\"").append(entry.getGroupName())
+ .append("\",\"enableBind\":1,\"allowedBClientRate\":")
+ .append(entry.getAllowedBrokerClientRate())
+ .append(",\"attributes\":\"\",\"lastBindUsedDate\":\"-\"")
+ .append("\",\"dataVersionId\":").append(entry.getDataVerId())
+ .append(",\"createUser\":\"").append(entry.getCreateUser())
+ .append("\",\"createDate\":\"").append(entry.getCreateDateStr())
+ .append("\",\"modifyUser\":\"").append(entry.getModifyUser())
+ .append("\",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ /**
+ * Add black consumer group info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminAddBlackGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> batchGroupNames = (Set<String>) result.retData1;
+ // add black list records
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (String groupName : batchGroupNames) {
+ retInfoList.add(metaDataManager.enOrDisConsumeCtrlConf(opEntity, groupName,
+ Boolean.FALSE, "Old API Set", sBuffer, result));
+ }
+ return buildRetInfo(retInfoList, sBuffer);
+ }
+
+ /**
+ * Add black consumer group info in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchAddBlackGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get groupNameJsonSet info
+ if (!getGroupJsonSetInfo(req, opEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Map<String, GroupResCtrlEntity> addRecordMap =
+ (Map<String, GroupResCtrlEntity>) result.getRetData();
+ // add or update and buid result
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (GroupResCtrlEntity entry : addRecordMap.values()) {
+ retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(entry, sBuffer, result));
+ }
+ return buildRetInfo(retInfoList, sBuffer);
+ }
+
+ /**
+ * Delete black consumer group info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDeleteBlackGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> batchGroupNames = (Set<String>) result.retData1;
+ // add disable black list records
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (String groupName : batchGroupNames) {
+ retInfoList.add(metaDataManager.enOrDisConsumeCtrlConf(opEntity, groupName,
+ Boolean.TRUE, "Old API Set", sBuffer, result));
+ }
+ return buildRetInfo(retInfoList, sBuffer);
+ }
+
+
+ /**
* Add group filter condition info
*
* @param req
@@ -648,91 +1008,6 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
- /**
- * Query group filter condition info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminQueryGroupFilterCondInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- BdbGroupFilterCondEntity webGroupFilterCondEntity =
- new BdbGroupFilterCondEntity();
- try {
- webGroupFilterCondEntity
- .setTopicName(WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- false, null));
- webGroupFilterCondEntity
- .setConsumerGroupName(WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- false, null));
- webGroupFilterCondEntity
- .setCreateUser(WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null));
- webGroupFilterCondEntity
- .setControlStatus(WebParameterUtils.validIntDataParameter("condStatus",
- req.getParameter("condStatus"),
- false,
- TBaseConstants.META_VALUE_UNDEFINED,
- 0));
- Set<String> filterCondSet =
- WebParameterUtils.checkAndGetFilterCondSet(req.getParameter("filterConds"), true, false, sBuilder);
- List<BdbGroupFilterCondEntity> webGroupCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(webGroupFilterCondEntity);
- SimpleDateFormat formatter =
- new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
- int j = 0;
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- for (BdbGroupFilterCondEntity entity : webGroupCondEntities) {
- if (!filterCondSet.isEmpty()) {
- String filterItems = entity.getFilterCondStr();
- if (filterItems.length() == 2
- && filterItems.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
- continue;
- } else {
- boolean allInc = true;
- for (String filterCond : filterCondSet) {
- if (!filterItems.contains(filterCond)) {
- allInc = false;
- break;
- }
- }
- if (!allInc) {
- continue;
- }
- }
- }
- if (j++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"topicName\":\"").append(entity.getTopicName())
- .append("\",\"groupName\":\"").append(entity.getConsumerGroupName())
- .append("\",\"condStatus\":").append(entity.getControlStatus());
- if (entity.getFilterCondStr().length() <= 2) {
- sBuilder.append(",\"filterConds\":\"\"");
- } else {
- sBuilder.append(",\"filterConds\":\"")
- .append(entity.getFilterCondStr())
- .append("\"");
- }
- sBuilder.append(",\"createUser\":\"").append(entity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate()))
- .append("\"}");
- }
- sBuilder.append("],\"count\":").append(j).append("}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
- }
- return sBuilder;
- }
/**
* Add authorized consumer group info
@@ -897,59 +1172,6 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
- /**
- * Query allowed(authorized?) consumer group info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminQueryConsumerGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- BdbConsumerGroupEntity webConsumerGroupEntity =
- new BdbConsumerGroupEntity();
- try {
- webConsumerGroupEntity
- .setGroupTopicName(WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- false, null));
- webConsumerGroupEntity
- .setConsumerGroupName(WebParameterUtils.validGroupParameter(
- "groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- false, null));
- webConsumerGroupEntity
- .setRecordCreateUser(WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null));
- List<BdbConsumerGroupEntity> webConsumerGroupEntities =
- brokerConfManager.confGetBdbAllowedConsumerGroupSet(webConsumerGroupEntity);
- SimpleDateFormat formatter =
- new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
- int j = 0;
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":")
- .append(webConsumerGroupEntities.size()).append(",\"data\":[");
- for (BdbConsumerGroupEntity entity : webConsumerGroupEntities) {
- if (j++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"topicName\":\"").append(entity.getGroupTopicName())
- .append("\",\"groupName\":\"").append(entity.getConsumerGroupName())
- .append("\",\"createUser\":\"").append(entity.getRecordCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(entity.getRecordCreateDate()))
- .append("\"}");
- }
- sBuilder.append("]}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
- }
- return sBuilder;
- }
/**
* Delete allowed(authorized) consumer group info
@@ -1015,241 +1237,11 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
- /**
- * Add black consumer group info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminAddBlackGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, true, brokerConfManager.getTotalConfiguredTopicNames(), sBuilder);
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- true, false, null, sBuilder);
- for (String tmpGroupName : batchOpGroupNames) {
- for (String tmpTopicName : batchOpTopicNames) {
- BdbBlackGroupEntity webBlackGroupEntity =
- new BdbBlackGroupEntity(tmpTopicName,
- tmpGroupName, createUser, createDate);
- brokerConfManager.confAddBdbBlackConsumerGroup(webBlackGroupEntity);
- }
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
- }
- /**
- * Add black consumer group info in batch
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminBatchAddBlackGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- List<Map<String, String>> jsonArray =
- WebParameterUtils.checkAndGetJsonArray("groupNameJsonSet",
- req.getParameter("groupNameJsonSet"),
- TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((jsonArray == null) || (jsonArray.isEmpty())) {
- throw new Exception("Null value of groupNameJsonSet, please set the value first!");
- }
- Set<String> configuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
- HashMap<String, BdbBlackGroupEntity> inBlackGroupEntityMap = new HashMap<>();
- for (int j = 0; j < jsonArray.size(); j++) {
- Map<String, String> groupObject = jsonArray.get(j);
- try {
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- groupObject.get("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String groupTopicName =
- WebParameterUtils.validStringParameter("topicName",
- groupObject.get("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- true, "");
- String groupCreateUser =
- WebParameterUtils.validStringParameter("createUser",
- groupObject.get("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null);
- Date groupCreateDate =
- WebParameterUtils.validDateParameter("createDate",
- groupObject.get("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, null);
- if ((TStringUtils.isBlank(groupCreateUser))
- || (groupCreateDate == null)) {
- groupCreateUser = createUser;
- groupCreateDate = createDate;
- }
- if (!configuredTopicSet.contains(groupTopicName)) {
- throw new Exception(sBuilder.append("Topic ").append(groupTopicName)
- .append(" not configure in master configure, please configure first!").toString());
- }
- String recordKey = sBuilder.append(groupName)
- .append("-").append(groupTopicName).toString();
- sBuilder.delete(0, sBuilder.length());
- inBlackGroupEntityMap.put(recordKey,
- new BdbBlackGroupEntity(groupTopicName,
- groupName, groupCreateUser, groupCreateDate));
- } catch (Exception ee) {
- sBuilder.delete(0, sBuilder.length());
- throw new Exception(sBuilder.append("Process data exception, data is :")
- .append(groupObject.toString())
- .append(", exception is : ")
- .append(ee.getMessage()).toString());
- }
- }
- if (inBlackGroupEntityMap.isEmpty()) {
- throw new Exception("Not found record in groupNameJsonSet parameter");
- }
- for (BdbBlackGroupEntity tmpBlackGroupEntity
- : inBlackGroupEntityMap.values()) {
- brokerConfManager.confAddBdbBlackConsumerGroup(tmpBlackGroupEntity);
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
- }
- /**
- * Query black consumer group info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminQueryBlackGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- BdbBlackGroupEntity webBlackGroupEntity =
- new BdbBlackGroupEntity();
- try {
- webBlackGroupEntity
- .setTopicName(WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- false, null));
- webBlackGroupEntity
- .setBlackGroupName(WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- false, null));
- webBlackGroupEntity
- .setCreateUser(WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null));
- List<BdbBlackGroupEntity> webBlackGroupEntities =
- brokerConfManager.confGetBdbBlackConsumerGroupSet(webBlackGroupEntity);
- SimpleDateFormat formatter =
- new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":")
- .append(webBlackGroupEntities.size()).append(",\"data\":[");
- int j = 0;
- for (BdbBlackGroupEntity entity : webBlackGroupEntities) {
- if (j++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"topicName\":\"").append(entity.getTopicName())
- .append("\",\"groupName\":\"").append(entity.getBlackGroupName())
- .append("\",\"createUser\":\"").append(entity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate()))
- .append("\"}");
- }
- sBuilder.append("]}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
- }
- return sBuilder;
- }
- /**
- * Delete black consumer group info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminDeleteBlackGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- true, false, null, sBuilder);
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- false, false, null, sBuilder);
- if (batchOpTopicNames.isEmpty()) {
- for (String tmpGroupName : batchOpGroupNames) {
- BdbBlackGroupEntity webBlackGroupEntity =
- new BdbBlackGroupEntity();
- webBlackGroupEntity.setBlackGroupName(tmpGroupName);
- brokerConfManager.confDeleteBdbBlackConsumerGroupSet(webBlackGroupEntity);
- }
- } else {
- for (String tmpGroupName : batchOpGroupNames) {
- for (String tmpTopicName : batchOpTopicNames) {
- BdbBlackGroupEntity webBlackGroupEntity =
- new BdbBlackGroupEntity();
- webBlackGroupEntity.setBlackGroupName(tmpGroupName);
- webBlackGroupEntity.setTopicName(tmpTopicName);
- brokerConfManager.confDeleteBdbBlackConsumerGroupSet(webBlackGroupEntity);
- }
- }
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
- }
+
+
/**
* Add consumer group setting
@@ -1391,64 +1383,6 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
- /**
- * Query consumer group setting
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminQueryConsumeGroupSetting(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- BdbConsumeGroupSettingEntity queryEntity =
- new BdbConsumeGroupSettingEntity();
- try {
- queryEntity
- .setConsumeGroupName(WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- false, null));
- queryEntity
- .setCreateUser(WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null));
- queryEntity
- .setEnableBind(WebParameterUtils.validIntDataParameter("enableBind",
- req.getParameter("enableBind"),
- false, -2, 0));
- queryEntity
- .setAllowedBrokerClientRate(WebParameterUtils.validIntDataParameter("allowedBClientRate",
- req.getParameter("allowedBClientRate"),
- false, -2, 0));
- List<BdbConsumeGroupSettingEntity> resultEntities =
- brokerConfManager.confGetBdbConsumeGroupSetting(queryEntity);
- SimpleDateFormat formatter =
- new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":")
- .append(resultEntities.size()).append(",\"data\":[");
- int j = 0;
- for (BdbConsumeGroupSettingEntity entity : resultEntities) {
- if (j++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"groupName\":\"").append(entity.getConsumeGroupName())
- .append("\",\"enableBind\":").append(entity.getEnableBind())
- .append(",\"allowedBClientRate\":").append(entity.getAllowedBrokerClientRate())
- .append(",\"attributes\":\"").append(entity.getAttributes())
- .append("\",\"lastBindUsedDate\":\"").append(entity.getLastBindUsedDate())
- .append("\",\"createUser\":\"").append(entity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate()))
- .append("\"}");
- }
- sBuilder.append("]}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
- }
- return sBuilder;
- }
/**
* Update consumer group setting
@@ -1550,4 +1484,87 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
+
+ private StringBuilder buildRetInfo(List<GroupProcessResult> retInfo,
+ StringBuilder sBuffer) {
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (GroupProcessResult entry : retInfo) {
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
+ .append(",\"success\":").append(entry.isSuccess())
+ .append(",\"errCode\":").append(entry.getErrCode())
+ .append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ private boolean getGroupJsonSetInfo(HttpServletRequest req, BaseEntity defOpEntity,
+ StringBuilder sBuffer, ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.GROUPJSONSET, true, null, result)) {
+ return result.success;
+ }
+ List<Map<String, String>> groupJsonArray =
+ (List<Map<String, String>>) result.retData1;
+ GroupResCtrlEntity itemEntity;
+ Map<String, String> itemValueMap;
+ Map<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
+ for (int j = 0; j < groupJsonArray.size(); j++) {
+ itemValueMap = groupJsonArray.get(j);
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
+ true, defOpEntity, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
+ // get group configure info
+ if (!WebParameterUtils.getStringParamValue(itemValueMap,
+ WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
+ return result.success;
+ }
+ String groupName = (String) result.retData1;
+ itemEntity =
+ new GroupResCtrlEntity(itemOpEntity, groupName);
+ itemEntity.updModifyInfo(itemEntity.getDataVerId(),
+ Boolean.FALSE, "Old API batch set", null,
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ null, TBaseConstants.META_VALUE_UNDEFINED, null);
+ addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+ }
+ // check result
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuffer
+ .append("Not found record info in ")
+ .append(WebFieldDef.GROUPJSONSET.name)
+ .append(" parameter!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(addRecordMap);
+ return result.isSuccess();
+ }
+
+ private <T> boolean getCondStatusParamValue(T paramCntr, boolean required, Boolean defValue,
+ StringBuilder sBuffer, ProcessResult result) {
+ // check and get condStatus field
+ if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.CONDSTATUS,
+ required, TBaseConstants.META_VALUE_UNDEFINED, 0, 2, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ int paramValue = (int) result.getRetData();
+ if (paramValue == TBaseConstants.META_VALUE_UNDEFINED) {
+ return defValue;
+ } else {
+ if (paramValue == 2) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+ }
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
deleted file mode 100644
index 59ab5db..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ /dev/null
@@ -1,1549 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.web.handler;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.corebase.cluster.TopicInfo;
-import org.apache.tubemq.corebase.utils.SettingValidUtils;
-import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.corebase.utils.Tuple2;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
-import org.apache.tubemq.server.common.fielddef.WebFieldDef;
-import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.common.utils.WebParameterUtils;
-import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.TStoreConstants;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WebBrokerTopicConfHandler extends AbstractWebHandler {
-
- private static final Logger logger =
- LoggerFactory.getLogger(WebBrokerTopicConfHandler.class);
-
- /**
- * Constructor
- *
- * @param master tube master
- */
- public WebBrokerTopicConfHandler(TMaster master) {
- super(master);
- }
-
- @Override
- public void registerWebApiMethod() {
- // register query method
- registerQueryWebMethod("admin_query_topic_info",
- "adminQueryTopicCfgEntityAndRunInfo");
- registerQueryWebMethod("admin_query_broker_topic_config_info",
- "adminQueryBrokerTopicCfgAndRunInfo");
- registerQueryWebMethod("admin_query_topicName",
- "adminQuerySimpleTopicName");
- registerQueryWebMethod("admin_query_brokerId",
- "adminQuerySimpleBrokerId");
- // register modify method
- registerModifyWebMethod("admin_add_new_topic_record",
- "adminAddTopicEntityInfo");
- registerModifyWebMethod("admin_bath_add_new_topic_record",
- "adminBatchAddTopicEntityInfo");
- registerModifyWebMethod("admin_modify_topic_info",
- "adminModifyTopicEntityInfo");
- registerModifyWebMethod("admin_delete_topic_info",
- "adminDeleteTopicEntityInfo");
- registerModifyWebMethod("admin_redo_deleted_topic_info",
- "adminRedoDeleteTopicEntityInfo");
- registerModifyWebMethod("admin_remove_topic_info",
- "adminRemoveTopicEntityInfo");
- }
-
- /**
- * Add new topic record
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminAddTopicEntityInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- // user
- String createUser =
- WebParameterUtils.validStringParameter("createUser", req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser", req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, false, createUser);
- // date
- Date createDate =
- WebParameterUtils.validDateParameter("createDate", req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate", req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, createDate);
- // topic names
- Set<String> batchAddTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, false, null, strBuffer);
- // broker IDs
- Set<BdbBrokerConfEntity> batchBrokerEntitySet =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), brokerConfManager, true,
- strBuffer);
- List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<>();
- List<BdbTopicAuthControlEntity> batchAddBdbTopicAuthControls = new ArrayList<>();
- // for each topic
- for (String topicName : batchAddTopicNames) {
- BdbTopicAuthControlEntity tmpTopicAuthControl =
- brokerConfManager.getBdbEnableAuthControlByTopicName(topicName);
- if (tmpTopicAuthControl == null) {
- batchAddBdbTopicAuthControls
- .add(new BdbTopicAuthControlEntity(topicName,
- false, createUser, createDate));
- }
- }
- // for each broker
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntitySet) {
- if (oldEntity == null) {
- continue;
- }
- if (WebParameterUtils.checkBrokerInProcessing(oldEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(oldEntity.getBrokerId());
- if (brokerTopicEntityMap != null) {
- for (String itemTopicName : batchAddTopicNames) {
- BdbTopicConfEntity tmpTopicConfEntity = brokerTopicEntityMap.get(itemTopicName);
- if (tmpTopicConfEntity != null) {
- if (tmpTopicConfEntity.isValidTopicStatus()) {
- throw new Exception(strBuffer.append("Topic of ").append(itemTopicName)
- .append(" has existed in broker topic configure by brokerId=")
- .append(oldEntity.getBrokerId()).toString());
- } else {
- throw new Exception(strBuffer.append("Topic of ").append(itemTopicName)
- .append(" is deleted softly in brokerId=").append(oldEntity.getBrokerId())
- .append(", please resume the record or hard removed first!").toString());
- }
- }
- }
- }
- final int defNumTopicStores = oldEntity.getNumTopicStores();
- final int defmemCacheMsgCntInK = oldEntity.getDftMemCacheMsgCntInK();
- final int defmemCacheMsgSizeInMB = oldEntity.getDftMemCacheMsgSizeInMB();
- final int defmemCacheFlushIntvl = oldEntity.getDftMemCacheFlushIntvl();
- String deleteWhen =
- WebParameterUtils.validDecodeStringParameter("deleteWhen",
- req.getParameter("deleteWhen"),
- TServerConstants.CFG_DELETEWHEN_MAX_LENGTH,
- false, oldEntity.getDftDeleteWhen());
- String deletePolicy =
- WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- req.getParameter("deletePolicy"), false, oldEntity.getDftDeletePolicy());
- int numPartitions =
- WebParameterUtils.validIntDataParameter("numPartitions",
- req.getParameter("numPartitions"),
- false, oldEntity.getDftNumPartitions(), 1);
- int unflushThreshold =
- WebParameterUtils.validIntDataParameter("unflushThreshold",
- req.getParameter("unflushThreshold"),
- false, oldEntity.getDftUnflushThreshold(), 0);
- int unflushInterval =
- WebParameterUtils.validIntDataParameter("unflushInterval",
- req.getParameter("unflushInterval"),
- false, oldEntity.getDftUnflushInterval(), 1);
- int unFlushDataHold =
- WebParameterUtils.validIntDataParameter("unflushDataHold",
- req.getParameter("unflushDataHold"),
- false, oldEntity.getDftUnFlushDataHold(), 0);
- boolean acceptPublish =
- WebParameterUtils.validBooleanDataParameter("acceptPublish",
- req.getParameter("acceptPublish"),
- false, oldEntity.isAcceptPublish());
- boolean acceptSubscribe =
- WebParameterUtils.validBooleanDataParameter("acceptSubscribe",
- req.getParameter("acceptSubscribe"),
- false, oldEntity.isAcceptSubscribe());
- int numTopicStores =
- WebParameterUtils.validIntDataParameter("numTopicStores",
- req.getParameter("numTopicStores"),
- false, defNumTopicStores, 1);
- int memCacheMsgCntInK =
- WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- req.getParameter("memCacheMsgCntInK"),
- false, defmemCacheMsgCntInK, 1);
- int memCacheMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- req.getParameter("memCacheMsgSizeInMB"),
- false, defmemCacheMsgSizeInMB, 2);
- memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
- int memCacheFlushIntvl =
- WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- req.getParameter("memCacheFlushIntvl"),
- false, defmemCacheFlushIntvl, 4000);
- int maxMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
- req.getParameter("maxMsgSizeInMB"),
- false, TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
- String attributes = strBuffer.append(TStoreConstants.TOKEN_STORE_NUM)
- .append(TokenConstants.EQ).append(numTopicStores)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
- .append(TokenConstants.EQ).append(unFlushDataHold)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_CNT)
- .append(TokenConstants.EQ).append(memCacheMsgCntInK)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE)
- .append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL)
- .append(TokenConstants.EQ).append(memCacheFlushIntvl)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MAX_MSG_SIZE)
- .append(TokenConstants.EQ)
- .append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
- .toString();
- strBuffer.delete(0, strBuffer.length());
- for (String itemTopicName : batchAddTopicNames) {
- batchAddBdbTopicEntities.add(new BdbTopicConfEntity(oldEntity.getBrokerId(),
- oldEntity.getBrokerIp(), oldEntity.getBrokerPort(), itemTopicName,
- numPartitions, unflushThreshold, unflushInterval, deleteWhen,
- deletePolicy, acceptPublish, acceptSubscribe, numTopicStores,
- attributes, createUser, createDate, modifyUser, modifyDate));
- }
- }
- inAddTopicConfigInfo(batchAddBdbTopicEntities, batchAddBdbTopicAuthControls);
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Add new topic record in batch
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminBatchAddTopicEntityInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser", req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate", req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- List<Map<String, String>> topicJsonArray =
- WebParameterUtils.checkAndGetJsonArray("topicJsonSet",
- req.getParameter("topicJsonSet"), TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((topicJsonArray == null) || (topicJsonArray.isEmpty())) {
- throw new Exception("Null value of topicJsonSet, please set the value first!");
- }
- Set<String> batchAddTopicNames = new HashSet<>();
- Set<String> batchAddItemKeys = new HashSet<>();
- List<BdbTopicAuthControlEntity> batchTopicAuthInfos = new ArrayList<>();
- List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<>();
- for (int count = 0; count < topicJsonArray.size(); count++) {
- Map<String, String> jsonObject = topicJsonArray.get(count);
- try {
- int brokerId =
- WebParameterUtils.validIntDataParameter("brokerId",
- jsonObject.get("brokerId"), true, 0, 1);
- BdbBrokerConfEntity brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(brokerId);
- if (brokerConfEntity == null) {
- throw new Exception(strBuffer
- .append("Not found broker default configure record by brokerId=").append(brokerId)
- .append(", please create the broker's default configure first!").toString());
- }
- String topicName =
- WebParameterUtils.validStringParameter("topicName", jsonObject.get("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
- String inputKey = strBuffer.append(brokerId).append("-").append(topicName).toString();
- strBuffer.delete(0, strBuffer.length());
- if (batchAddItemKeys.contains(inputKey)) {
- continue;
- }
- if (WebParameterUtils.checkBrokerInProcessing(brokerId, brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(brokerConfEntity.getBrokerId());
- if (brokerTopicEntityMap != null) {
- BdbTopicConfEntity tmpTopicConfEntity = brokerTopicEntityMap.get(topicName);
- if (tmpTopicConfEntity != null) {
- if (tmpTopicConfEntity.isValidTopicStatus()) {
- throw new Exception(strBuffer
- .append("Duplicate add broker's topic configure, exist record is: ")
- .append(tmpTopicConfEntity).toString());
- } else {
- throw new Exception(strBuffer.append("Topic of ").append(topicName)
- .append(" is deleted softly in brokerId=")
- .append(brokerId).append(", please resume the record or hard removed first!")
- .toString());
- }
- }
- }
- final String deleteWhen =
- WebParameterUtils.validDecodeStringParameter("deleteWhen",
- jsonObject.get("deleteWhen"),
- TServerConstants.CFG_DELETEWHEN_MAX_LENGTH,
- false, brokerConfEntity.getDftDeleteWhen());
- final String deletePolicy =
- WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- jsonObject.get("deletePolicy"), false, brokerConfEntity.getDftDeletePolicy());
- final int numPartitions =
- WebParameterUtils.validIntDataParameter("numPartitions",
- jsonObject.get("numPartitions"),
- false, brokerConfEntity.getDftNumPartitions(), 1);
- final int unflushThreshold =
- WebParameterUtils.validIntDataParameter("unflushThreshold",
- jsonObject.get("unflushThreshold"),
- false, brokerConfEntity.getDftUnflushThreshold(), 0);
- final int unflushInterval =
- WebParameterUtils.validIntDataParameter("unflushInterval",
- jsonObject.get("unflushInterval"),
- false, brokerConfEntity.getDftUnflushInterval(), 1);
- int unFlushDataHold =
- WebParameterUtils.validIntDataParameter("unflushDataHold",
- jsonObject.get("unflushDataHold"),
- false, brokerConfEntity.getDftUnFlushDataHold(), 0);
- final boolean acceptPublish =
- WebParameterUtils.validBooleanDataParameter("acceptPublish",
- jsonObject.get("acceptPublish"),
- false, brokerConfEntity.isAcceptPublish());
- final boolean acceptSubscribe =
- WebParameterUtils.validBooleanDataParameter("acceptSubscribe",
- jsonObject.get("acceptSubscribe"),
- false, brokerConfEntity.isAcceptSubscribe());
- final int numTopicStores =
- WebParameterUtils.validIntDataParameter("numTopicStores",
- jsonObject.get("numTopicStores"),
- false, brokerConfEntity.getNumTopicStores(), 1);
- final int memCacheMsgCntInK =
- WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- jsonObject.get("memCacheMsgCntInK"),
- false, brokerConfEntity.getDftMemCacheMsgCntInK(), 1);
- int memCacheMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- jsonObject.get("memCacheMsgSizeInMB"),
- false, brokerConfEntity.getDftMemCacheMsgSizeInMB(), 2);
- memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
- int memCacheFlushIntvl =
- WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- jsonObject.get("memCacheFlushIntvl"),
- false, brokerConfEntity.getDftMemCacheFlushIntvl(), 4000);
- int maxMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
- jsonObject.get("maxMsgSizeInMB"),
- false, TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
- String itemCreateUser =
- WebParameterUtils.validStringParameter("createUser",
- jsonObject.get("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, false, null);
- Date itemCreateDate =
- WebParameterUtils.validDateParameter("createDate",
- jsonObject.get("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, null);
- if ((TStringUtils.isBlank(itemCreateUser)) || (itemCreateDate == null)) {
- itemCreateUser = createUser;
- itemCreateDate = createDate;
- }
- String attributes =
- strBuffer.append(TStoreConstants.TOKEN_STORE_NUM)
- .append(TokenConstants.EQ).append(numTopicStores)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
- .append(TokenConstants.EQ).append(unFlushDataHold)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_CNT)
- .append(TokenConstants.EQ).append(memCacheMsgCntInK)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE)
- .append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL)
- .append(TokenConstants.EQ).append(memCacheFlushIntvl)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MAX_MSG_SIZE)
- .append(TokenConstants.EQ)
- .append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
- .toString();
- strBuffer.delete(0, strBuffer.length());
- batchAddItemKeys.add(inputKey);
- batchAddBdbTopicEntities.add(new BdbTopicConfEntity(brokerConfEntity.getBrokerId(),
- brokerConfEntity.getBrokerIp(), brokerConfEntity.getBrokerPort(),
- topicName, numPartitions, unflushThreshold, unflushInterval,
- deleteWhen, deletePolicy, acceptPublish, acceptSubscribe,
- numTopicStores, attributes, itemCreateUser, itemCreateDate,
- itemCreateUser, itemCreateDate));
- if (!batchAddTopicNames.contains(topicName)) {
- BdbTopicAuthControlEntity tmpTopicAuthControl =
- brokerConfManager.getBdbEnableAuthControlByTopicName(topicName);
- if (tmpTopicAuthControl == null) {
- batchTopicAuthInfos.add(
- new BdbTopicAuthControlEntity(topicName, false, createUser, createDate));
- }
- }
- batchAddTopicNames.add(topicName);
- } catch (Exception ee) {
- strBuffer.delete(0, strBuffer.length());
- throw new Exception(strBuffer.append("Process data exception, data is :")
- .append(jsonObject.toString()).append(", exception is : ")
- .append(ee.getMessage()).toString());
- }
- }
- inAddTopicConfigInfo(batchAddBdbTopicEntities, batchTopicAuthInfos);
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Private method to add topic config info
- *
- * @param batchAddBdbTopicEntities
- * @param batchTopicAuthInfos
- * @throws Exception
- */
- private void inAddTopicConfigInfo(List<BdbTopicConfEntity> batchAddBdbTopicEntities,
- List<BdbTopicAuthControlEntity> batchTopicAuthInfos) throws Exception {
- boolean inserted = false;
- try {
- for (BdbTopicConfEntity itemBdbTopicEntity : batchAddBdbTopicEntities) { // for each topic
- BdbBrokerConfEntity brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(itemBdbTopicEntity.getBrokerId());
- // if broker conf is not set, or the broker is busy with processing events,
- // skip this topic
- if (brokerConfEntity == null
- || WebParameterUtils.checkBrokerInProcessing(itemBdbTopicEntity.getBrokerId(),
- brokerConfManager, null)) {
- continue;
- }
- boolean result = brokerConfManager.confAddTopicConfig(itemBdbTopicEntity);
- if (result) { // if it succeeds in adding topic config
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerConfManager.getBrokerRunSyncStatusInfo(itemBdbTopicEntity.getBrokerId());
- // set Fast start = false
- if (brokerSyncStatusInfo != null) {
- brokerSyncStatusInfo.setFastStart(false);
- }
- // update broker config
- if (!brokerConfEntity.isConfDataUpdated()) { // config data NOT updated
- brokerConfManager.updateBrokerConfChanged(brokerConfEntity.getBrokerId(),
- true, false);
- }
- }
- inserted = true;
- } // for each topic
-
- // if at least one topic is updated,
- // update topic authorization control
- if (inserted) {
- for (BdbTopicAuthControlEntity topicAuthControlEntity
- : batchTopicAuthInfos) {
- BdbTopicAuthControlEntity tmpTopicAuthControl =
- brokerConfManager.getBdbEnableAuthControlByTopicName(topicAuthControlEntity.getTopicName());
- if (tmpTopicAuthControl == null) {
- brokerConfManager.confSetBdbTopicAuthControl(topicAuthControlEntity);
- }
- }
- }
- } catch (Exception ee) {
- logger.warn("Fun.inAddTopicConfigInfo throw exception", ee);
- }
- }
-
- /**
- * Query topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminQueryTopicCfgEntityAndRunInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- BdbTopicConfEntity webTopicEntity = new BdbTopicConfEntity();
- try {
- webTopicEntity
- .setDeleteWhen(WebParameterUtils.validDecodeStringParameter("deleteWhen",
- req.getParameter("deleteWhen"),
- TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false, null));
- webTopicEntity
- .setDeletePolicy(WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- req.getParameter("deletePolicy"), false, null));
- webTopicEntity
- .setUnflushInterval(WebParameterUtils.validIntDataParameter("unflushInterval",
- req.getParameter("unflushInterval"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 1));
- webTopicEntity
- .setUnflushThreshold(WebParameterUtils.validIntDataParameter("unflushThreshold",
- req.getParameter("unflushThreshold"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 0));
- webTopicEntity
- .setUnflushDataHold(WebParameterUtils.validIntDataParameter("unflushDataHold",
- req.getParameter("unflushDataHold"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 0));
- webTopicEntity
- .setTopicStatusId(WebParameterUtils.validIntDataParameter("topicStatusId",
- req.getParameter("topicStatusId"),
- false, TBaseConstants.META_VALUE_UNDEFINED, TStatusConstants.STATUS_TOPIC_OK));
- webTopicEntity
- .setNumPartitions(WebParameterUtils.validIntDataParameter("numPartitions",
- req.getParameter("numPartitions"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 1));
- webTopicEntity
- .setCreateUser(WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, false, null));
- webTopicEntity
- .setModifyUser(WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, false, null));
- webTopicEntity
- .setNumTopicStores(WebParameterUtils.validIntDataParameter("numTopicStores",
- req.getParameter("numTopicStores"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 1));
- webTopicEntity
- .setMemCacheMsgSizeInMB(WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- req.getParameter("memCacheMsgSizeInMB"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 2));
- webTopicEntity
- .setMemCacheMsgCntInK(WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- req.getParameter("memCacheMsgCntInK"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 1));
- webTopicEntity
- .setMemCacheFlushIntvl(WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- req.getParameter("memCacheFlushIntvl"),
- false, TBaseConstants.META_VALUE_UNDEFINED, 4000));
- Set<Integer> batchBrokerIds =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), false);
- if (batchBrokerIds.size() == 1) {
- for (Integer brokerId : batchBrokerIds) {
- webTopicEntity.setBrokerId(brokerId);
- }
- }
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"), false, false, null, strBuffer);
- if (batchOpTopicNames.size() == 1) {
- for (String topicName : batchOpTopicNames) {
- webTopicEntity.setTopicName(topicName);
- }
- }
- ConcurrentHashMap<String, List<BdbTopicConfEntity>> queryResultMap =
- brokerConfManager.getBdbTopicEntityMap(webTopicEntity);
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
- SimpleDateFormat formatter =
- new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- int totalCount = 0;
- for (Map.Entry<String, List<BdbTopicConfEntity>> entry : queryResultMap.entrySet()) {
- if ((!batchOpTopicNames.isEmpty()) && (!batchOpTopicNames.contains(entry.getKey()))) {
- continue;
- }
- if (totalCount++ > 0) {
- strBuffer.append(",");
- }
- int totalCfgNumPartCount = 0;
- int totalRunNumPartCount = 0;
- boolean isSrvAcceptPublish = false;
- boolean isSrvAcceptSubscribe = false;
- boolean isAcceptPublish = false;
- boolean isAcceptSubscribe = false;
- strBuffer.append("{\"topicName\":\"").append(entry.getKey()).append("\",\"topicInfo\":[");
- int count = 0;
- for (BdbTopicConfEntity entity : entry.getValue()) {
- if ((!batchBrokerIds.isEmpty()) && (!batchBrokerIds.contains(entity.getBrokerId()))) {
- continue;
- }
- if (count++ > 0) {
- strBuffer.append(",");
- }
- totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
- strBuffer.append("{\"topicName\":\"").append(entity.getTopicName())
- .append("\",\"topicStatusId\":").append(entity.getTopicStatusId())
- .append(",\"brokerId\":").append(entity.getBrokerId())
- .append(",\"brokerIp\":\"").append(entity.getBrokerIp())
- .append("\",\"brokerPort\":").append(entity.getBrokerPort())
- .append(",\"numPartitions\":").append(entity.getNumPartitions())
- .append(",\"unflushThreshold\":").append(entity.getUnflushThreshold())
- .append(",\"unflushInterval\":").append(entity.getUnflushInterval())
- .append(",\"unflushDataHold\":").append(entity.getUnflushDataHold())
- .append(",\"deleteWhen\":\"").append(entity.getDeleteWhen())
- .append("\",\"deletePolicy\":\"").append(entity.getDeletePolicy())
- .append("\",\"acceptPublish\":").append(entity.getAcceptPublish())
- .append(",\"acceptSubscribe\":").append(entity.getAcceptSubscribe())
- .append(",\"numTopicStores\":").append(entity.getNumTopicStores())
- .append(",\"memCacheMsgSizeInMB\":").append(entity.getMemCacheMsgSizeInMB())
- .append(",\"memCacheFlushIntvl\":").append(entity.getMemCacheFlushIntvl())
- .append(",\"memCacheMsgCntInK\":").append(entity.getMemCacheMsgCntInK())
- .append(",\"maxMsgSizeInMB\":")
- .append(entity.getMaxMsgSize() / TBaseConstants.META_MB_UNIT_SIZE)
- .append(",\"createUser\":\"").append(entity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate()))
- .append("\",\"modifyUser\":\"").append(entity.getModifyUser())
- .append("\",\"modifyDate\":\"").append(formatter.format(entity.getModifyDate()))
- .append("\",\"runInfo\":{");
- String strManageStatus = "-";
- BdbBrokerConfEntity brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(entity.getBrokerId());
- if (brokerConfEntity != null) {
- int manageStatus = brokerConfEntity.getManageStatus();
- strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus);
- Tuple2<Boolean, Boolean> pubSubStatus =
- WebParameterUtils.getPubSubStatusByManageStatus(manageStatus);
- isAcceptPublish = pubSubStatus.getF0();
- isAcceptSubscribe = pubSubStatus.getF1();
- }
- BrokerInfo broker =
- new BrokerInfo(entity.getBrokerId(), entity.getBrokerIp(), entity.getBrokerPort());
- TopicInfo topicInfo = topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
- if (topicInfo == null) {
- strBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
- .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
- } else {
- if (isAcceptPublish) {
- strBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
- if (topicInfo.isAcceptPublish()) {
- isSrvAcceptPublish = true;
- }
- } else {
- strBuffer.append("\"acceptPublish\":false");
- }
- if (isAcceptSubscribe) {
- strBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
- if (topicInfo.isAcceptSubscribe()) {
- isSrvAcceptSubscribe = true;
- }
- } else {
- strBuffer.append(",\"acceptSubscribe\":false");
- }
- totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
- strBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
- .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
- .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
- }
- strBuffer.append("}}");
- }
- strBuffer.append("],\"infoCount\":").append(count)
- .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
- .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
- .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
- .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount)
- .append(",\"authData\":{");
- BdbTopicAuthControlEntity authEntity =
- brokerConfManager.getBdbEnableAuthControlByTopicName(entry.getKey());
- if (authEntity != null) {
- strBuffer.append("\"enableAuthControl\":").append(authEntity.isEnableAuthControl())
- .append(",\"createUser\":\"").append(authEntity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(authEntity.getCreateDate()))
- .append("\",\"authConsumeGroup\":[");
- List<BdbConsumerGroupEntity> webConsumerGroupEntities =
- brokerConfManager.getBdbAllowedConsumerGroups(entry.getKey());
- int countJ = 0;
- for (BdbConsumerGroupEntity groupEntity : webConsumerGroupEntities) {
- if (countJ++ > 0) {
- strBuffer.append(",");
- }
- strBuffer.append("{\"groupName\":\"").append(groupEntity.getConsumerGroupName())
- .append("\",\"createUser\":\"").append(groupEntity.getRecordCreateUser())
- .append("\",\"createDate\":\"")
- .append(formatter.format(groupEntity.getRecordCreateDate()))
- .append("\"}");
- }
- strBuffer.append("],\"groupCount\":").append(countJ).append(",\"authFilterCondSet\":[");
- List<BdbGroupFilterCondEntity> filterConds =
- brokerConfManager.getBdbAllowedGroupFilterConds(entry.getKey());
- int countY = 0;
- for (BdbGroupFilterCondEntity itemCond : filterConds) {
- if (countY++ > 0) {
- strBuffer.append(",");
- }
- strBuffer.append("{\"groupName\":\"").append(itemCond.getConsumerGroupName())
- .append("\",\"condStatus\":").append(itemCond.getControlStatus());
- if (itemCond.getFilterCondStr().length() <= 2) {
- strBuffer.append(",\"filterConds\":\"\"");
- } else {
- strBuffer.append(",\"filterConds\":\"").append(itemCond.getFilterCondStr()).append("\"");
- }
- strBuffer.append(",\"createUser\":\"").append(itemCond.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(itemCond.getCreateDate()))
- .append("\"}");
- }
- strBuffer.append("],\"filterCount\":").append(countY);
- }
- strBuffer.append("}}");
- }
- strBuffer.append("],\"dataCount\":").append(totalCount).append("}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"dataCount\":0,\"data\":[]}");
- }
- return strBuffer;
- }
-
- /**
- * Query broker's topic-name set info
- *
- * @param req
- * @return
- */
- public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Set<Integer> brokerIds = (Set<Integer>) result.retData1;
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- Map<Integer, Set<String>> brokerTopicConfigMap =
- brokerConfManager.getBrokerTopicConfigInfo(brokerIds);
- int dataCount = 0;
- for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
- if (dataCount++ > 0) {
- sBuffer.append(",");
- }
- sBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
- int topicCnt = 0;
- Set<String> topicSet = entry.getValue();
- for (String topic : topicSet) {
- if (topicCnt++ > 0) {
- sBuffer.append(",");
- }
- sBuffer.append("\"").append(topic).append("\"");
- }
- sBuffer.append("],\"topicCount\":").append(topicCnt).append("}");
- }
- sBuffer.append("],\"dataCount\":").append(dataCount).append("}");
- return sBuffer;
- }
-
- /**
- * Query topic's broker id set
- *
- * @param req
- * @return
- */
- public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Set<String> topicNameSet = (Set<String>) result.retData1;
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.WITHIP, false, false, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- boolean withIp = (Boolean) result.retData1;
- // return result;
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- Map<String, Map<Integer, String>> opicBrokerConfigMap =
- brokerConfManager.getTopicBrokerConfigInfo(topicNameSet);
- int dataCount = 0;
- for (Map.Entry<String, Map<Integer, String>> entry : opicBrokerConfigMap.entrySet()) {
- if (dataCount++ > 0) {
- sBuffer.append(",");
- }
- sBuffer.append("{\"topicName\":\"").append(entry.getKey()).append("\",\"brokerInfo\":[");
- int topicCnt = 0;
- Map<Integer, String> brokerMap = entry.getValue();
- if (withIp) {
- for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
- if (topicCnt++ > 0) {
- sBuffer.append(",");
- }
- sBuffer.append("{\"brokerId\":").append(entry1.getKey())
- .append(",\"brokerIp\":\"").append(entry1.getValue()).append("\"}");
- }
- } else {
- for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
- if (topicCnt++ > 0) {
- sBuffer.append(",");
- }
- sBuffer.append(entry1.getKey());
- }
- }
- sBuffer.append("],\"brokerCnt\":").append(topicCnt).append("}");
- }
- sBuffer.append("],\"dataCount\":").append(dataCount).append("}");
- return sBuffer;
- }
-
- /**
- * Delete topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminDeleteTopicEntityInfo(HttpServletRequest req) throws Exception {
- return innModifyTopicStatusEntityInfo(req, TStatusConstants.STATUS_TOPIC_SOFT_DELETE);
- }
-
- /**
- * Remove topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminRemoveTopicEntityInfo(HttpServletRequest req) throws Exception {
- return innModifyTopicStatusEntityInfo(req, TStatusConstants.STATUS_TOPIC_SOFT_REMOVE);
- }
-
- /**
- * Internal method to perform deletion and removal of topic
- *
- * @param req
- * @param topicStatusId
- * @return
- * @throws Exception
- */
- // #lizard forgives
- private StringBuilder innModifyTopicStatusEntityInfo(HttpServletRequest req,
- int topicStatusId) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- // Check if the request is authorized
- // and the parameters are valid
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- Set<String> batchRmvTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, false, null, strBuffer);
- Set<BdbBrokerConfEntity> batchInputTopicEntitySet =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
- brokerConfManager, true, strBuffer);
- Set<Integer> changedBrokerSet = new HashSet<>();
- Set<BdbTopicConfEntity> batchRmvBdbTopicEntitySet =
- new HashSet<>();
-
- // For the broker to perform, check its status
- // and check the config of the topic to see if the action could be performed
- for (BdbBrokerConfEntity brokerConfEntity : batchInputTopicEntitySet) {
- if (brokerConfEntity == null) { // skip brokers whose config is not set
- continue;
- }
- if (WebParameterUtils.checkBrokerInProcessing(brokerConfEntity.getBrokerId(), brokerConfManager,
- strBuffer)) { // skip brokers which is busy processing events
- throw new Exception(strBuffer.toString());
- }
- if (WebParameterUtils.checkBrokerUnLoad(brokerConfEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- // skip brokers whose config is not loaded
- throw new Exception(strBuffer.toString());
- }
- ConcurrentHashMap<String /* topic name */, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(brokerConfEntity.getBrokerId());
- if ((brokerTopicEntityMap == null)
- || (brokerTopicEntityMap.isEmpty())) { // no topic configured on the broker
- throw new Exception(strBuffer.append("No topic configure in broker=")
- .append(brokerConfEntity.getBrokerId())
- .append(", please confirm the configure first!").toString());
- }
- for (String itemTopicName : batchRmvTopicNames) { // for each topic to remove
- BdbTopicConfEntity bdbTopicConfEntity =
- brokerTopicEntityMap.get(itemTopicName);
- if (bdbTopicConfEntity == null) { // topic entity does not exist on the broker
- throw new Exception(strBuffer.append("Not found the topic ")
- .append(itemTopicName)
- .append("'s configure in broker=")
- .append(brokerConfEntity.getBrokerId())
- .append(", please confirm the configure first!").toString());
- }
- if (bdbTopicConfEntity.getAcceptPublish()
- || bdbTopicConfEntity.getAcceptSubscribe()) { // still accept publish and subscribe
- throw new Exception(strBuffer.append("The topic ").append(itemTopicName)
- .append("'s acceptPublish and acceptSubscribe parameters must be false in broker=")
- .append(brokerConfEntity.getBrokerId())
- .append(" before topic deleted!").toString());
- }
- if (topicStatusId == TStatusConstants.STATUS_TOPIC_SOFT_DELETE) {
- if (!bdbTopicConfEntity.isValidTopicStatus()) { // is soft delete
- continue;
- }
- } else if (topicStatusId == TStatusConstants.STATUS_TOPIC_SOFT_REMOVE) {
- if (bdbTopicConfEntity.getTopicStatusId() != TStatusConstants.STATUS_TOPIC_SOFT_DELETE) {
- continue;
- }
- }
- BdbTopicConfEntity queryEntity = new BdbTopicConfEntity();
- queryEntity.setBrokerAndTopicInfo(brokerConfEntity.getBrokerId(),
- brokerConfEntity.getBrokerIp(),
- brokerConfEntity.getBrokerPort(),
- itemTopicName);
- batchRmvBdbTopicEntitySet.add(queryEntity);
- }
- }
-
- // Perform the action and check again
- try {
- for (BdbTopicConfEntity itemTopicEntity : batchRmvBdbTopicEntitySet) {
- BdbBrokerConfEntity brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(itemTopicEntity.getBrokerId());
- if (brokerConfEntity == null) { // skip those brokers whose config is not set
- continue;
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(brokerConfEntity.getBrokerId());
- if ((brokerTopicEntityMap == null)
- || (brokerTopicEntityMap.isEmpty())) { // no topic configured on the broker
- continue;
- }
- BdbTopicConfEntity bdbTopicConfEntity =
- brokerTopicEntityMap.get(itemTopicEntity.getTopicName());
- if (bdbTopicConfEntity == null) {
- continue;
- }
- if (bdbTopicConfEntity.getAcceptPublish()
- && bdbTopicConfEntity.getAcceptSubscribe()) { // accept publish AND subscribe ??
- continue;
- }
- if (topicStatusId == TStatusConstants.STATUS_TOPIC_SOFT_DELETE) {
- if (!bdbTopicConfEntity.isValidTopicStatus()) {
- continue;
- }
- } else if (topicStatusId == TStatusConstants.STATUS_TOPIC_SOFT_REMOVE) {
- if (bdbTopicConfEntity.getTopicStatusId() != TStatusConstants.STATUS_TOPIC_SOFT_DELETE) {
- continue;
- }
- }
- if (WebParameterUtils.checkBrokerInProcessing(brokerConfEntity.getBrokerId(), brokerConfManager,
- null)) { // broker is busy processing event
- continue;
- }
- if (WebParameterUtils.checkBrokerUnLoad(brokerConfEntity.getBrokerId(), brokerConfManager, null)) {
- if (!changedBrokerSet.contains(brokerConfEntity.getBrokerId())) { // already changed
- continue;
- }
- }
- inRmvTopicAuthControlInfo(itemTopicEntity.getTopicName(), modifyUser);
- bdbTopicConfEntity.setTopicStatusId(topicStatusId);
- boolean result = brokerConfManager.confModTopicConfig(bdbTopicConfEntity);
- if (result) {
- if (!brokerConfEntity.isConfDataUpdated()) {
- brokerConfManager.updateBrokerConfChanged(brokerConfEntity.getBrokerId(), true, true);
- changedBrokerSet.add(brokerConfEntity.getBrokerId());
- }
- }
- }
- } catch (Exception ee) {
- //
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Redo delete topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminRedoDeleteTopicEntityInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager, req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser", req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- Set<String> batchRmvTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"), true, false, null, strBuffer);
- Set<BdbBrokerConfEntity> batchBrokerEntitySet =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), brokerConfManager, true,
- strBuffer);
- List<BdbTopicConfEntity> batchRmvBdbTopicEntities = new ArrayList<>();
- for (BdbBrokerConfEntity brokerConfEntity : batchBrokerEntitySet) {
- if (brokerConfEntity == null) {
- continue;
- }
- if (WebParameterUtils.checkBrokerInProcessing(brokerConfEntity.getBrokerId(), brokerConfManager,
- strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- if (WebParameterUtils.checkBrokerUnLoad(brokerConfEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(brokerConfEntity.getBrokerId());
- if ((brokerTopicEntityMap == null) || (brokerTopicEntityMap.isEmpty())) {
- throw new Exception(strBuffer.append("No topic configure in broker=")
- .append(brokerConfEntity.getBrokerId())
- .append(", please confirm the configure first!").toString());
- }
- for (String itemTopicName : batchRmvTopicNames) {
- BdbTopicConfEntity bdbTopicConfEntity = brokerTopicEntityMap.get(itemTopicName);
- if (bdbTopicConfEntity == null) {
- throw new Exception(strBuffer.append("Not found the topic ").append(itemTopicName)
- .append("'s configure in broker=").append(brokerConfEntity.getBrokerId())
- .append(", please confirm the configure first!").toString());
- }
- if (bdbTopicConfEntity.getAcceptPublish() || bdbTopicConfEntity.getAcceptSubscribe()) {
- throw new Exception(strBuffer.append("The topic ").append(itemTopicName)
- .append("'s acceptPublish and acceptSubscribe parameters must be false in broker=")
- .append(brokerConfEntity.getBrokerId()).append(" before topic deleted!").toString());
- }
- if (bdbTopicConfEntity.getTopicStatusId() != TStatusConstants.STATUS_TOPIC_SOFT_DELETE) {
- if (bdbTopicConfEntity.isValidTopicStatus()) {
- continue;
- } else {
- throw new Exception(strBuffer.append("Topic of ").append(itemTopicName)
- .append("is in removing flow in brokerId=").append(brokerConfEntity.getBrokerId())
- .append(", please wait until remove process finished!").toString());
- }
- }
- BdbTopicConfEntity queryEntity = new BdbTopicConfEntity();
- queryEntity.setBrokerAndTopicInfo(brokerConfEntity.getBrokerId(),
- brokerConfEntity.getBrokerIp(), brokerConfEntity.getBrokerPort(), itemTopicName);
- batchRmvBdbTopicEntities.add(queryEntity);
- }
- }
- try {
- Set<Integer> changedBrokerSet = new HashSet<>();
- for (BdbTopicConfEntity itemTopicEntity : batchRmvBdbTopicEntities) {
- BdbBrokerConfEntity brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(itemTopicEntity.getBrokerId());
- if (brokerConfEntity == null) {
- continue;
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(brokerConfEntity.getBrokerId());
- if ((brokerTopicEntityMap == null) || (brokerTopicEntityMap.isEmpty())) {
- continue;
- }
- BdbTopicConfEntity bdbTopicConfEntity = brokerTopicEntityMap.get(itemTopicEntity.getTopicName());
- if ((bdbTopicConfEntity == null)
- || (bdbTopicConfEntity.getAcceptPublish() && bdbTopicConfEntity.getAcceptSubscribe())
- || (bdbTopicConfEntity.getTopicStatusId() != TStatusConstants.STATUS_TOPIC_SOFT_DELETE)
- || (WebParameterUtils
- .checkBrokerInProcessing(brokerConfEntity.getBrokerId(), brokerConfManager, null))) {
- continue;
- }
- if (WebParameterUtils.checkBrokerUnLoad(brokerConfEntity.getBrokerId(), brokerConfManager, null)) {
- if (!changedBrokerSet.contains(brokerConfEntity.getBrokerId())) {
- continue;
- }
- }
- inRmvTopicAuthControlInfo(itemTopicEntity.getTopicName(), modifyUser);
- bdbTopicConfEntity.setTopicStatusId(TStatusConstants.STATUS_TOPIC_OK);
- boolean result = brokerConfManager.confModTopicConfig(bdbTopicConfEntity);
- BdbTopicAuthControlEntity tmpTopicAuthControl =
- brokerConfManager.getBdbEnableAuthControlByTopicName(bdbTopicConfEntity.getTopicName());
- if (tmpTopicAuthControl == null) {
- brokerConfManager.confSetBdbTopicAuthControl(
- new BdbTopicAuthControlEntity(bdbTopicConfEntity
- .getTopicName(), false, modifyUser, new Date()));
- }
- if (result) {
- if (!brokerConfEntity.isConfDataUpdated()) {
- brokerConfManager.updateBrokerConfChanged(
- brokerConfEntity.getBrokerId(), true, true);
- changedBrokerSet.add(brokerConfEntity.getBrokerId());
- }
- }
- }
- } catch (Exception ee) {
- //
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Private method to remove topic authorization control info
- *
- * @param topicName
- * @param modifyUser
- * @throws Exception
- */
- private void inRmvTopicAuthControlInfo(final String topicName, final String modifyUser) throws Exception {
- BdbTopicAuthControlEntity webTopicAuthControlEntity = new BdbTopicAuthControlEntity();
- webTopicAuthControlEntity.setTopicName(topicName);
- List<BdbTopicAuthControlEntity> webTopicAuthControlEntities =
- brokerConfManager.confGetBdbTopicAuthCtrlEntityList(webTopicAuthControlEntity);
- if (!webTopicAuthControlEntities.isEmpty()) {
- try {
- BdbGroupFilterCondEntity filterCondEntity =
- new BdbGroupFilterCondEntity();
- filterCondEntity.setTopicName(topicName);
- List<BdbGroupFilterCondEntity> webFilterCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(filterCondEntity);
- if (!webFilterCondEntities.isEmpty()) {
- filterCondEntity.setCreateUser(modifyUser);
- brokerConfManager.confDelBdbAllowedGroupFilterCondSet(filterCondEntity);
- }
- BdbConsumerGroupEntity groupEntity =
- new BdbConsumerGroupEntity();
- groupEntity.setGroupTopicName(topicName);
- List<BdbConsumerGroupEntity> webConsumerGroupEntities =
- brokerConfManager.confGetBdbAllowedConsumerGroupSet(groupEntity);
- if (!webConsumerGroupEntities.isEmpty()) {
- groupEntity.setRecordCreateUser(modifyUser);
- brokerConfManager.confDelBdbAllowedConsumerGroupSet(groupEntity);
- }
- BdbTopicAuthControlEntity authEntity =
- new BdbTopicAuthControlEntity();
- authEntity.setTopicName(topicName);
- authEntity.setCreateUser(modifyUser);
- brokerConfManager.confDeleteBdbTopicAuthControl(authEntity);
- } catch (Exception e) {
- logger.warn("Fun.inRmvTopicAuthControlInfo throw exception", e);
- }
- }
- }
-
- /**
- * Query broker topic config info
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminQueryBrokerTopicCfgAndRunInfo(
- HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- BdbTopicConfEntity webTopicEntity = new BdbTopicConfEntity();
- try {
- boolean hasCond = false;
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- false, false, null, strBuffer);
- if (!batchOpTopicNames.isEmpty()) {
- hasCond = true;
- if (batchOpTopicNames.size() == 1) {
- for (String topicName : batchOpTopicNames) {
- webTopicEntity.setTopicName(topicName);
- }
- }
- }
- webTopicEntity.setTopicStatusId(TBaseConstants.META_VALUE_UNDEFINED);
- webTopicEntity.setNumTopicStores(TBaseConstants.META_VALUE_UNDEFINED);
- webTopicEntity.setMemCacheMsgSizeInMB(TBaseConstants.META_VALUE_UNDEFINED);
- webTopicEntity.setMemCacheMsgCntInK(TBaseConstants.META_VALUE_UNDEFINED);
- webTopicEntity.setMemCacheFlushIntvl(TBaseConstants.META_VALUE_UNDEFINED);
- webTopicEntity.setUnflushDataHold(TBaseConstants.META_VALUE_UNDEFINED);
- Map<Integer, BdbBrokerConfEntity> totalBrokers =
- brokerConfManager.getBrokerConfStoreMap();
- Map<Integer, BrokerSyncStatusInfo> brokerSyncStatusInfoMap =
- brokerConfManager.getBrokerRunSyncManageMap();
- Map<String, List<BdbTopicConfEntity>> topicQueryResults =
- brokerConfManager.getBdbTopicEntityMap(webTopicEntity);
- List<Integer> brokerIds = new ArrayList<>();
- if (hasCond) {
- for (List<BdbTopicConfEntity> topicConfEntities : topicQueryResults.values()) {
- if (topicConfEntities == null || topicConfEntities.isEmpty()) {
- continue;
- }
- for (BdbTopicConfEntity topicConfEntity : topicConfEntities) {
- if (topicConfEntity == null) {
- continue;
- }
- brokerIds.add(topicConfEntity.getBrokerId());
- }
- }
-
- } else {
- for (Integer brokerId : totalBrokers.keySet()) {
- if (brokerId == null) {
- continue;
- }
- brokerIds.add(brokerId);
- }
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- int totalCount = 0;
- for (Integer brokerId : brokerIds) {
- BdbBrokerConfEntity brokerEntity = totalBrokers.get(brokerId);
- if (brokerEntity == null) {
- continue;
- }
- if (totalCount++ > 0) {
- strBuffer.append(",");
- }
- boolean isAcceptPublish = false;
- boolean isAcceptSubscribe = false;
- int totalNumPartCount = 0;
- int totalStoreNum = 0;
- strBuffer.append("{\"brokerId\":").append(brokerEntity.getBrokerId())
- .append(",\"brokerIp\":\"").append(brokerEntity.getBrokerIp())
- .append("\",\"brokerPort\":").append(brokerEntity.getBrokerPort())
- .append(",\"runInfo\":{");
- String strManageStatus = "-";
- BdbBrokerConfEntity brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(brokerEntity.getBrokerId());
- if (brokerConfEntity != null) {
- int manageStatus = brokerConfEntity.getManageStatus();
- strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus);
- Tuple2<Boolean, Boolean> pubSubStatus =
- WebParameterUtils.getPubSubStatusByManageStatus(manageStatus);
- isAcceptPublish = pubSubStatus.getF0();
- isAcceptSubscribe = pubSubStatus.getF1();
- }
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerSyncStatusInfoMap.get(brokerEntity.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- strBuffer.append("\"acceptPublish\":\"-\"")
- .append(",\"acceptSubscribe\":\"-\"")
- .append(",\"totalPartitionNum\":\"-\"")
- .append(",\"totalTopicStoreNum\":\"-\"")
- .append(",\"brokerManageStatus\":\"-\"");
- } else {
- if (isAcceptPublish) {
- strBuffer.append("\"acceptPublish\":")
- .append(brokerSyncStatusInfo.isAcceptPublish());
- } else {
- strBuffer.append("\"acceptPublish\":false");
- }
- if (isAcceptSubscribe) {
- strBuffer.append(",\"acceptSubscribe\":")
- .append(brokerSyncStatusInfo.isAcceptSubscribe());
- } else {
- strBuffer.append(",\"acceptSubscribe\":false");
- }
- for (List<BdbTopicConfEntity> topicEntityList : topicQueryResults.values()) {
- if (topicEntityList == null || topicEntityList.isEmpty()) {
- continue;
- }
- for (BdbTopicConfEntity topicEntity : topicEntityList) {
- if (topicEntity == null) {
- continue;
- }
- totalStoreNum += topicEntity.getNumTopicStores();
- totalNumPartCount +=
- topicEntity.getNumTopicStores() * topicEntity.getNumPartitions();
- }
- }
- strBuffer.append(",\"totalPartitionNum\":")
- .append(totalNumPartCount)
- .append(",\"totalTopicStoreNum\":")
- .append(totalStoreNum)
- .append(",\"brokerManageStatus\":\"")
- .append(strManageStatus).append("\"");
- }
- strBuffer.append("}}");
- }
- strBuffer.append("],\"dataCount\":").append(totalCount).append("}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"dataCount\":0,\"data\":[]}");
- }
- return strBuffer;
- }
-
- /**
- * Modify topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminModifyTopicEntityInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder();
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager, req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser", req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate", req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- Set<String> batchModTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, false, null, strBuffer);
- Set<BdbBrokerConfEntity> batchBrokerEntitySet =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
- brokerConfManager, true, strBuffer);
- String deleteWhen =
- WebParameterUtils.validDecodeStringParameter("deleteWhen", req.getParameter("deleteWhen"),
- TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false, null);
- String deletePolicy =
- WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- req.getParameter("deletePolicy"), false, null);
- int numPartitions =
- WebParameterUtils.validIntDataParameter("numPartitions",
- req.getParameter("numPartitions"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- int unflushThreshold =
- WebParameterUtils.validIntDataParameter("unflushThreshold",
- req.getParameter("unflushThreshold"), false, TBaseConstants.META_VALUE_UNDEFINED, 0);
- int unflushInterval =
- WebParameterUtils.validIntDataParameter("unflushInterval",
- req.getParameter("unflushInterval"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- int numTopicStores =
- WebParameterUtils.validIntDataParameter("numTopicStores",
- req.getParameter("numTopicStores"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- int memCacheMsgCntInK =
- WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- req.getParameter("memCacheMsgCntInK"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- int memCacheMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- req.getParameter("memCacheMsgSizeInMB"), false, TBaseConstants.META_VALUE_UNDEFINED, 2);
- memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
- int memCacheFlushIntvl =
- WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- req.getParameter("memCacheFlushIntvl"), false, TBaseConstants.META_VALUE_UNDEFINED, 4000);
- int unFlushDataHold =
- WebParameterUtils.validIntDataParameter("unflushDataHold",
- req.getParameter("unflushDataHold"), false, TBaseConstants.META_VALUE_UNDEFINED, 0);
- int maxMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
- req.getParameter("maxMsgSizeInMB"),
- false, TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
-
- List<BdbTopicConfEntity> batchModBdbTopicEntities = new ArrayList<>();
- for (BdbBrokerConfEntity tgtEntity : batchBrokerEntitySet) {
- if (tgtEntity == null) {
- continue;
- }
- if (WebParameterUtils.checkBrokerInProcessing(tgtEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(tgtEntity.getBrokerId());
- if ((brokerTopicEntityMap == null) || (brokerTopicEntityMap.isEmpty())) {
- throw new Exception(strBuffer.append("No topic configure in broker=")
- .append(tgtEntity.getBrokerId()).append(", please confirm the configure first!")
- .toString());
- }
- for (String itemTopicName : batchModTopicNames) {
- BdbTopicConfEntity oldEntity = brokerTopicEntityMap.get(itemTopicName);
- if (oldEntity == null) {
- throw new Exception(strBuffer.append("Not found the topic ")
- .append(itemTopicName).append("'s configure in broker=")
- .append(tgtEntity.getBrokerId()).append(", please confirm the configure first!")
- .toString());
- }
- if (!oldEntity.isValidTopicStatus()) {
- throw new Exception(strBuffer.append("Topic of ").append(itemTopicName)
- .append("is deleted softly in brokerId=").append(tgtEntity.getBrokerId())
- .append(", please resume the record or hard removed first!").toString());
- }
- boolean foundChange = false;
- BdbTopicConfEntity newEntity =
- new BdbTopicConfEntity(oldEntity.getBrokerId(), oldEntity.getBrokerIp(),
- oldEntity.getBrokerPort(), oldEntity.getTopicName(),
- oldEntity.getNumPartitions(), oldEntity.getUnflushThreshold(),
- oldEntity.getUnflushInterval(), oldEntity.getDeleteWhen(),
- oldEntity.getDeletePolicy(), oldEntity.getAcceptPublish(),
- oldEntity.getAcceptSubscribe(), oldEntity.getNumTopicStores(),
- oldEntity.getAttributes(),
- oldEntity.getCreateUser(), oldEntity.getCreateDate(), modifyUser, modifyDate);
- if ((!TStringUtils.isBlank(deleteWhen)) && (!deleteWhen.equals(oldEntity.getDeleteWhen()))) {
- foundChange = true;
- newEntity.setDeleteWhen(deleteWhen);
- }
- if ((!TStringUtils.isBlank(deletePolicy)) && (!deletePolicy.equals(oldEntity.getDeletePolicy()))) {
- foundChange = true;
- newEntity.setDeletePolicy(deletePolicy);
- }
- if ((numPartitions > 0) && (numPartitions != oldEntity.getNumPartitions())) {
- if (numPartitions < oldEntity.getNumPartitions()) {
- throw new Exception(strBuffer
- .append("Partition value is less than before," +
- "please confirm the configure first! brokerId=")
- .append(oldEntity.getBrokerId())
- .append(", topicName=").append(oldEntity.getTopicName())
- .append(", old Partition value is ").append(oldEntity.getNumPartitions())
- .append(", new Partition value is ").append(numPartitions).toString());
- }
- foundChange = true;
- newEntity.setNumPartitions(numPartitions);
- }
- if ((unflushThreshold >= 0) && (unflushThreshold != oldEntity.getUnflushThreshold())) {
- foundChange = true;
- newEntity.setUnflushThreshold(unflushThreshold);
- }
- if (unFlushDataHold >= 0 && unFlushDataHold != oldEntity.getUnflushDataHold()) {
- foundChange = true;
- newEntity.setUnflushDataHold(unFlushDataHold);
- }
- if (memCacheMsgCntInK >= 0 && memCacheMsgCntInK != oldEntity.getMemCacheMsgCntInK()) {
- foundChange = true;
- newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_MSG_CNT,
- String.valueOf(memCacheMsgCntInK));
- }
- if (memCacheMsgSizeInMB >= 0 && memCacheMsgSizeInMB != oldEntity.getMemCacheMsgSizeInMB()) {
- foundChange = true;
- newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_MSG_SIZE,
- String.valueOf(memCacheMsgSizeInMB));
- }
- if (maxMsgSizeInMB > 0) {
- int maxMsgSizeInB =
- SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
- if (maxMsgSizeInB != oldEntity.getMaxMsgSize()) {
- foundChange = true;
- newEntity.appendAttributes(TStoreConstants.TOKEN_MAX_MSG_SIZE,
- String.valueOf(maxMsgSizeInB));
- }
- }
- if (memCacheFlushIntvl >= 0 && memCacheFlushIntvl != oldEntity.getMemCacheFlushIntvl()) {
- foundChange = true;
- newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL,
- String.valueOf(memCacheFlushIntvl));
- }
- if ((numTopicStores > 0) && (numTopicStores != oldEntity.getNumTopicStores())) {
- if (numTopicStores < oldEntity.getNumTopicStores()) {
- throw new Exception(strBuffer
- .append("TopicStores value is less than before," +
- "please confirm the configure first! brokerId=")
- .append(oldEntity.getBrokerId())
- .append(", topicName=").append(oldEntity.getTopicName())
- .append(", old TopicStores value is ").append(oldEntity.getNumTopicStores())
- .append(", new TopicStores value is ").append(numTopicStores).toString());
- }
- foundChange = true;
- newEntity.setNumTopicStores(numTopicStores);
- }
- if ((unflushInterval > 0) && (unflushInterval != oldEntity.getUnflushInterval())) {
- foundChange = true;
- newEntity.setUnflushInterval(unflushInterval);
- }
- String publishParaStr = req.getParameter("acceptPublish");
- if (!TStringUtils.isBlank(publishParaStr)) {
- boolean acceptPublish =
- WebParameterUtils.validBooleanDataParameter("acceptPublish",
- req.getParameter("acceptPublish"), true, true);
- if (acceptPublish != oldEntity.getAcceptPublish()) {
- foundChange = true;
- newEntity.setAcceptPublish(acceptPublish);
- }
- }
- String subscribeParaStr = req.getParameter("acceptSubscribe");
- if (!TStringUtils.isBlank(subscribeParaStr)) {
- boolean acceptSubscribe =
- WebParameterUtils.validBooleanDataParameter("acceptSubscribe",
- req.getParameter("acceptSubscribe"), true, true);
- if (acceptSubscribe != oldEntity.getAcceptSubscribe()) {
- foundChange = true;
- newEntity.setAcceptSubscribe(acceptSubscribe);
- }
- }
- if (!foundChange) {
- continue;
- }
- batchModBdbTopicEntities.add(newEntity);
- }
- }
- if (batchModBdbTopicEntities.isEmpty()) {
- throw new Exception("Not found data changed, please confirm the topic configure!");
- }
- try {
- for (BdbTopicConfEntity itemTopicEntity : batchModBdbTopicEntities) {
- BdbBrokerConfEntity brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(itemTopicEntity.getBrokerId());
- if (brokerConfEntity == null) {
- continue;
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(brokerConfEntity.getBrokerId());
- if ((brokerTopicEntityMap == null)
- || (brokerTopicEntityMap.isEmpty())) {
- continue;
- }
- BdbTopicConfEntity oldEntity =
- brokerTopicEntityMap.get(itemTopicEntity.getTopicName());
- if (oldEntity == null) {
- continue;
- }
- boolean isFastStart = true;
- if ((itemTopicEntity.getNumPartitions() != oldEntity.getNumPartitions())
- || (itemTopicEntity.getNumTopicStores() != oldEntity.getNumTopicStores())
- || (itemTopicEntity.getAcceptSubscribe() != oldEntity.getAcceptSubscribe())) {
- isFastStart = false;
- }
- if (WebParameterUtils.checkBrokerInProcessing(itemTopicEntity.getBrokerId(), brokerConfManager,
- null)) {
- continue;
- }
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerConfManager.getBrokerRunSyncStatusInfo(itemTopicEntity.getBrokerId());
- boolean result = brokerConfManager.confModTopicConfig(itemTopicEntity);
- if (result) {
- if ((brokerSyncStatusInfo != null) && !isFastStart) {
- brokerSyncStatusInfo.setFastStart(isFastStart);
- }
- brokerConfEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(itemTopicEntity.getBrokerId());
- if (brokerConfEntity != null && !brokerConfEntity.isConfDataUpdated()) {
- brokerConfManager.updateBrokerConfChanged(
- brokerConfEntity.getBrokerId(), true, isFastStart);
- }
- }
- }
- } catch (Exception ee) {
- //
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 621ad5d..5a6558c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
@@ -112,19 +111,19 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
Set<String> filterCondSet = (Set<String>) result.retData1;
qryEntity.updModifyInfo(qryEntity.getDataVerId(),
consumeEnable, null, filterEnable, null);
- Map<String, List<GroupConsumeCtrlEntity>> qryResultSet =
- metaDataManager.getGroupConsumeCtrlConf(groupSet, topicNameSet);
+ Map<String, List<GroupConsumeCtrlEntity>> qryResultMap =
+ metaDataManager.getGroupConsumeCtrlConf(groupSet, topicNameSet, qryEntity);
// build return result
int totalCnt = 0;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
- for (List<GroupConsumeCtrlEntity> consumeCtrlEntityList : qryResultSet.values()) {
+ for (List<GroupConsumeCtrlEntity> consumeCtrlEntityList : qryResultMap.values()) {
if (consumeCtrlEntityList == null || consumeCtrlEntityList.isEmpty()) {
continue;
}
for (GroupConsumeCtrlEntity entity : consumeCtrlEntityList) {
if (entity == null
- || !entity.isMatched(qryEntity)
- || !isFilterItemAllIncluded(filterCondSet, entity.getFilterCondStr())) {
+ || !WebParameterUtils.isFilterSetFullIncluded(
+ filterCondSet, entity.getFilterCondStr())) {
continue;
}
if (totalCnt++ > 0) {
@@ -357,7 +356,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
// parse groupCsmJsonSet field info
GroupConsumeCtrlEntity itemConf;
Map<String, String> itemsMap;
- HashMap<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
+ Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
metaDataManager.getTotalConfiguredTopicNames();
for (int j = 0; j < filterJsonArray.size(); j++) {
@@ -425,22 +424,4 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
return result.isSuccess();
}
- private boolean isFilterItemAllIncluded(Set<String> filterCondSet, String filterConsStr) {
- if (filterCondSet == null || filterCondSet.isEmpty()) {
- return true;
- }
- if (filterConsStr == null
- || (filterConsStr.length() == 2
- && filterConsStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR))) {
- return false;
- }
- boolean allInc = true;
- for (String filterCond : filterCondSet) {
- if (!filterConsStr.contains(filterCond)) {
- allInc = false;
- break;
- }
- }
- return allInc;
- }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index 4fd3470..476b1e9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -148,7 +148,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- private StringBuilder adminBatchAddGroupResCtrlConf(HttpServletRequest req) {
+ public StringBuilder adminBatchAddGroupResCtrlConf(HttpServletRequest req) {
return innBatchAddOrUpdGroupResCtrlConf(req, true);
}
@@ -168,7 +168,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- private StringBuilder adminBatchUpdGroupResCtrlConf(HttpServletRequest req) {
+ public StringBuilder adminBatchUpdGroupResCtrlConf(HttpServletRequest req) {
return innBatchAddOrUpdGroupResCtrlConf(req, false);
}
@@ -299,7 +299,6 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return buildRetInfo(retInfo, sBuffer);
}
-
private StringBuilder innBatchAddOrUpdGroupResCtrlConf(HttpServletRequest req,
boolean isAddOp) {
ProcessResult result = new ProcessResult();
@@ -348,7 +347,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
// check and get topic control configure
GroupResCtrlEntity itemEntity;
Map<String, String> itemValueMap;
- HashMap<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
+ Map<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
for (int j = 0; j < ctrlJsonArray.size(); j++) {
itemValueMap = ctrlJsonArray.get(j);
// check and get operation info
@@ -433,20 +432,20 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
}
private StringBuilder buildRetInfo(List<GroupProcessResult> retInfo,
- StringBuilder sBuilder) {
+ StringBuilder sBuffer) {
int totalCnt = 0;
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (GroupProcessResult entry : retInfo) {
if (totalCnt++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- sBuilder.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
+ sBuffer.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
.append(",\"success\":").append(entry.isSuccess())
.append(",\"errCode\":").append(entry.getErrCode())
.append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 67fce58..3aa491c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -72,6 +72,11 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
// register modify method
registerModifyWebMethod("admin_set_cluster_default_setting",
"adminSetClusterDefSetting");
+
+ // Deprecated methods begin
+ // query method
+ registerQueryWebMethod("admin_query_def_flow_control_rule",
+ "adminQueryDefGroupFlowCtrlRule");
}
/**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
index 297ac0b..b8c9190 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -296,7 +296,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
// check and get topic control configure
TopicCtrlEntity itemConf;
Map<String, String> itemConfMap;
- HashMap<String, TopicCtrlEntity> addRecordMap = new HashMap<>();
+ Map<String, TopicCtrlEntity> addRecordMap = new HashMap<>();
for (int j = 0; j < ctrlJsonArray.size(); j++) {
itemConfMap = ctrlJsonArray.get(j);
// check and get operation info
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index 3c18ba2..b81495a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -31,6 +31,7 @@ import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.tubemq.server.common.statusdef.TopicStsChgType;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
@@ -44,15 +45,11 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDepl
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class WebTopicDeployHandler extends AbstractWebHandler {
- private static final Logger logger =
- LoggerFactory.getLogger(WebTopicDeployHandler.class);
+public class WebTopicDeployHandler extends AbstractWebHandler {
/**
* Constructor
@@ -68,184 +65,140 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
@Override
public void registerWebApiMethod() {
// register query method
- registerQueryWebMethod("admin_query_topic_info",
- "adminQueryTopicCfgEntityAndRunInfo");
+ registerQueryWebMethod("admin_query_topic_deploy_info",
+ "adminNewQueryTopicCfgAndRunInfo");
registerQueryWebMethod("admin_query_broker_topic_config_info",
"adminQueryBrokerTopicCfgAndRunInfo");
registerQueryWebMethod("admin_query_topicName",
"adminQuerySimpleTopicName");
registerQueryWebMethod("admin_query_brokerId",
"adminQuerySimpleBrokerId");
+
// register modify method
+ registerModifyWebMethod("admin_add_topic_deploy_info",
+ "adminAddTopicDeployInfo");
+ registerModifyWebMethod("admin_bath_add_topic_deploy_info",
+ "adminBatchAddTopicDeployInfo");
+ registerModifyWebMethod("admin_update_topic_deploy_info",
+ "adminModifyTopicDeployInfo");
+ registerModifyWebMethod("admin_batch_update_topic_deploy_info",
+ "adminBatchUpdTopicDeployInfo");
+ registerModifyWebMethod("admin_delete_topic_deploy_info",
+ "adminDelTopicDeployInfo");
+ registerModifyWebMethod("admin_redo_deleted_topic_deploy_info",
+ "adminRedoDeletedTopicDeployInfo");
+ registerModifyWebMethod("admin_remove_topic_deploy_info",
+ "adminRmvTopicDeployInfo");
+
+ // Deprecated methods begin
+ // query
+ registerQueryWebMethod("admin_query_topic_info",
+ "adminOldQueryTopicCfgAndRunInfo");
+ // modify
registerModifyWebMethod("admin_add_new_topic_record",
- "adminAddTopicEntityInfo");
+ "adminAddTopicDeployInfo");
registerModifyWebMethod("admin_bath_add_new_topic_record",
- "adminBatchAddTopicEntityInfo");
+ "adminBatchAddTopicDeployInfo");
registerModifyWebMethod("admin_modify_topic_info",
- "adminModifyTopicEntityInfo");
+ "adminModifyTopicDeployInfo");
registerModifyWebMethod("admin_delete_topic_info",
- "adminDeleteTopicEntityInfo");
+ "adminDelTopicDeployInfo");
registerModifyWebMethod("admin_redo_deleted_topic_info",
- "adminRedoDeleteTopicEntityInfo");
+ "adminRedoDeletedTopicDeployInfo");
registerModifyWebMethod("admin_remove_topic_info",
- "adminRemoveTopicEntityInfo");
+ "adminRmvTopicDeployInfo");
+ // Deprecated methods end
}
+
/**
- * Query topic info
+ * Query topic info with new format return
*
* @param req
* @return
*/
- public StringBuilder adminQueryTopicCfgEntityAndRunInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- TopicDeployEntity qryEntity = new TopicDeployEntity();
- // get queried operation info, for createUser, modifyUser, dataVersionId
- if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- // check and get topicName field
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Set<String> topicNameSet = (Set<String>) result.retData1;
- // check and get brokerId field
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
- // get brokerPort field
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
- false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int brokerPort = (int) result.getRetData();
- // get and valid topicProps info
- if (!WebParameterUtils.getTopicPropInfo(req, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
- // get and valid TopicStatusId info
- if (!WebParameterUtils.getTopicStatusParamValue(req,
- false, TopicStatus.STATUS_TOPIC_UNDEFINED, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- TopicStatus topicStatus = (TopicStatus) result.getRetData();
- qryEntity.updModifyInfo(qryEntity.getDataVerId(),
- TBaseConstants.META_VALUE_UNDEFINED,
- brokerPort, null, topicStatus, topicProps);
- Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
- metaDataManager.getTopicConfEntityMap(topicNameSet, brokerIdSet, qryEntity);
- // build query result
- return buildQueryResult(sBuffer, true, topicDeployInfoMap);
+ public StringBuilder adminNewQueryTopicCfgAndRunInfo(HttpServletRequest req) {
+ return innQueryTopicConfAndRunInfo(req, true);
}
- private StringBuilder buildQueryResult(StringBuilder sBuffer,
- boolean withAuthInfo,
- Map<String, List<TopicDeployEntity>> topicDeployInfoMap) {
- // build query result
- int totalCnt = 0;
- int totalCfgNumPartCount = 0;
- int totalRunNumPartCount = 0;
- boolean isSrvAcceptPublish = false;
- boolean isSrvAcceptSubscribe = false;
- boolean isAcceptPublish = false;
- boolean isAcceptSubscribe = false;
- ManageStatus manageStatus;
- Tuple2<Boolean, Boolean> pubSubStatus;
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
- for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
- totalCfgNumPartCount = 0;
- totalRunNumPartCount = 0;
- isSrvAcceptPublish = false;
- isSrvAcceptSubscribe = false;
- isAcceptPublish = false;
- isAcceptSubscribe = false;
- TopicCtrlEntity ctrlEntity =
- metaDataManager.getTopicCtrlByTopicName(entry.getKey());
- ctrlEntity.toWebJsonStr(sBuffer, true, false);
- sBuffer.append(",\"deployInfo\":[");
- int brokerCount = 0;
- for (TopicDeployEntity entity : entry.getValue()) {
- if (brokerCount++ > 0) {
- sBuffer.append(",");
- }
- totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
- entity.toWebJsonStr(sBuffer, true, false);
- sBuffer.append("\",\"runInfo\":{");
- BrokerConfEntity brokerConfEntity =
- metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
- String strManageStatus = "-";
- if (brokerConfEntity != null) {
- manageStatus = brokerConfEntity.getManageStatus();
- strManageStatus = manageStatus.getDescription();
- pubSubStatus = manageStatus.getPubSubStatus();
- isAcceptPublish = pubSubStatus.getF0();
- isAcceptSubscribe = pubSubStatus.getF1();
- }
- BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
- entity.getBrokerIp(), entity.getBrokerPort());
- TopicInfo topicInfo =
- topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
- if (topicInfo == null) {
- sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
- .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
- } else {
- if (isAcceptPublish) {
- sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
- if (topicInfo.isAcceptPublish()) {
- isSrvAcceptPublish = true;
- }
- } else {
- sBuffer.append("\"acceptPublish\":false");
- }
- if (isAcceptSubscribe) {
- sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
- if (topicInfo.isAcceptSubscribe()) {
- isSrvAcceptSubscribe = true;
- }
- } else {
- sBuffer.append(",\"acceptSubscribe\":false");
- }
- totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
- sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
- .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
- .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
- }
- sBuffer.append("}}");
- }
- sBuffer.append("],\"infoCount\":").append(brokerCount)
- .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
- .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
- .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
- .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount);
- if (withAuthInfo) {
- sBuffer.append(",\"authConsumeGroup\":[");
- List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
- metaDataManager.getConsumeCtrlByTopic(entry.getKey());
- int countJ = 0;
- for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
- if (countJ++ > 0) {
- sBuffer.append(",");
- }
- groupEntity.toWebJsonStr(sBuffer, true, false);
- }
- sBuffer.append("],\"groupCount\":").append(countJ);
- }
- sBuffer.append("}");
- }
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
- return sBuffer;
+ /**
+ * Query topic info with old format return
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminOldQueryTopicCfgAndRunInfo(HttpServletRequest req) {
+ return innQueryTopicConfAndRunInfo(req, false);
+ }
+
+ /**
+ * Add new topic deployment record
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminAddTopicDeployInfo(HttpServletRequest req) {
+ return innAddOrUpdTopicDeployInfo(req, true);
+ }
+
+ /**
+ * Modify topic deployment info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminModifyTopicDeployInfo(HttpServletRequest req) {
+ return innAddOrUpdTopicDeployInfo(req, false);
+ }
+
+ /**
+ * Add new topic deployment record in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchAddTopicDeployInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdTopicDeployInfo(req, true);
+ }
+
+ /**
+ * Add new topic deployment record in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchUpdTopicDeployInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdTopicDeployInfo(req, false);
+ }
+
+ /**
+ * Delete topic info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDelTopicDeployInfo(HttpServletRequest req) {
+ return innModifyTopicDeployStatusInfo(req, TopicStsChgType.STATUS_CHANGE_SOFT_DELETE);
+ }
+
+ /**
+ * Remove topic info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminRmvTopicDeployInfo(HttpServletRequest req) {
+ return innModifyTopicDeployStatusInfo(req, TopicStsChgType.STATUS_CHANGE_REMOVE);
+ }
+
+ /**
+ * Redo delete topic info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminRedoDeletedTopicDeployInfo(HttpServletRequest req) {
+ return innModifyTopicDeployStatusInfo(req, TopicStsChgType.STATUS_CHANGE_REDO_SFDEL);
}
/**
@@ -350,35 +303,35 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
*/
public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.COMPSBROKERID, false, sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<Integer> brokerIds = (Set<Integer>) result.retData1;
Map<Integer, Set<String>> brokerTopicConfigMap =
metaDataManager.getBrokerTopicConfigInfo(brokerIds);
// build query result
int dataCount = 0;
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
if (dataCount++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- sBuilder.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
+ sBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
int topicCnt = 0;
Set<String> topicSet = entry.getValue();
for (String topic : topicSet) {
if (topicCnt++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- sBuilder.append("\"").append(topic).append("\"");
+ sBuffer.append("\"").append(topic).append("\"");
}
- sBuilder.append("],\"topicCount\":").append(topicCnt).append("}");
+ sBuffer.append("],\"topicCount\":").append(topicCnt).append("}");
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, dataCount);
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, dataCount);
+ return sBuffer;
}
/**
@@ -437,109 +390,315 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
/**
- * Add new topic record
+ * Query topic info
*
* @param req
* @return
*/
- public StringBuilder adminAddTopicEntityInfo(HttpServletRequest req) {
+ private StringBuilder innQueryTopicConfAndRunInfo(HttpServletRequest req, boolean isNewVer) {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ TopicDeployEntity qryEntity = new TopicDeployEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- BaseEntity opEntity = (BaseEntity) result.getRetData();
- // check and get topicName info
+ // check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
Set<String> topicNameSet = (Set<String>) result.retData1;
- // check and get brokerId info
+ // check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
+ WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
- // get and valid TopicPropGroup info
+ // get brokerPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int brokerPort = (int) result.getRetData();
+ // get and valid topicProps info
if (!WebParameterUtils.getTopicPropInfo(req, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- TopicPropGroup topicPropInfo = (TopicPropGroup) result.getRetData();
- /* check max message size
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.MAXMSGSIZEINMB, false,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
- result)) {
+ TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
+ // get withGroupAuthInfo field
+ if (!WebParameterUtils.getBooleanParamValue(req, WebFieldDef.WITHGROUPAUTHINFO,
+ false, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuilder;
+ return sBuffer;
}
- int inMaxMsgSizeMB = (int) result.getRetData();
- */
- List<TopicProcessResult> retInfo =
- metaDataManager.addTopicDeployInfo(opEntity, brokerIdSet,
- topicNameSet, topicPropInfo, sBuffer, result);
- return buildRetInfo(retInfo, sBuffer);
- }
-
- /**
- * Add new topic record in batch
- *
- * @param req
- * @return
- */
- public StringBuilder adminBatchAddTopicEntityInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ Boolean withGroupAuthInfo = (Boolean) result.getRetData();
+ // get and valid TopicStatusId info
+ if (!WebParameterUtils.getTopicStatusParamValue(req,
+ false, TopicStatus.STATUS_TOPIC_UNDEFINED, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
+ TopicStatus topicStatus = (TopicStatus) result.getRetData();
+ qryEntity.updModifyInfo(qryEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ brokerPort, null, topicStatus, topicProps);
+ Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
+ metaDataManager.getTopicConfEntityMap(topicNameSet, brokerIdSet, qryEntity);
+ // build query result
+ if (isNewVer) {
+ return buildNewQueryResult(withGroupAuthInfo, sBuffer, topicDeployInfoMap);
+ } else {
+ return buildOldQueryResult(sBuffer, topicDeployInfoMap);
}
- BaseEntity defOpEntity = (BaseEntity) result.getRetData();
- // check and get add record map
- if (!getTopicDeployJsonSetInfo(req, true,
- defOpEntity, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
+ }
+
+ private StringBuilder buildOldQueryResult(StringBuilder sBuffer,
+ Map<String, List<TopicDeployEntity>> topicDeployInfoMap) {
+ // build query result
+ int totalCnt = 0;
+ int maxMsgSizeInMB = 0;
+ int totalCfgNumPartCount = 0;
+ int totalRunNumPartCount = 0;
+ boolean enableAuthCtrl;
+ boolean isSrvAcceptPublish = false;
+ boolean isSrvAcceptSubscribe = false;
+ boolean isAcceptPublish = false;
+ boolean isAcceptSubscribe = false;
+ ManageStatus manageStatus;
+ Tuple2<Boolean, Boolean> pubSubStatus;
+ TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ ClusterSettingEntity defSetting = metaDataManager.getClusterDefSetting(false);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
+ totalCfgNumPartCount = 0;
+ totalRunNumPartCount = 0;
+ isSrvAcceptPublish = false;
+ isSrvAcceptSubscribe = false;
+ isAcceptPublish = false;
+ isAcceptSubscribe = false;
+ enableAuthCtrl = false;
+ TopicCtrlEntity ctrlEntity =
+ metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+ maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+ if (ctrlEntity != null) {
+ maxMsgSizeInMB = ctrlEntity.getMaxMsgSizeInMB();
+ enableAuthCtrl = ctrlEntity.getAuthCtrlStatus().isEnable();
+ }
+ sBuffer.append("{\"topicName\":\"").append(entry.getKey())
+ .append("\",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
+ .append(",\"topicInfo\":[");
+ int brokerCount = 0;
+ for (TopicDeployEntity entity : entry.getValue()) {
+ if (brokerCount++ > 0) {
+ sBuffer.append(",");
+ }
+ totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
+ entity.toWebJsonStr(sBuffer, true, false);
+ sBuffer.append("\",\"runInfo\":{");
+ BrokerConfEntity brokerConfEntity =
+ metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
+ String strManageStatus = "-";
+ if (brokerConfEntity != null) {
+ manageStatus = brokerConfEntity.getManageStatus();
+ strManageStatus = manageStatus.getDescription();
+ pubSubStatus = manageStatus.getPubSubStatus();
+ isAcceptPublish = pubSubStatus.getF0();
+ isAcceptSubscribe = pubSubStatus.getF1();
+ }
+ BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
+ entity.getBrokerIp(), entity.getBrokerPort());
+ TopicInfo topicInfo =
+ topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+ if (topicInfo == null) {
+ sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
+ .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
+ } else {
+ if (isAcceptPublish) {
+ sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
+ if (topicInfo.isAcceptPublish()) {
+ isSrvAcceptPublish = true;
+ }
+ } else {
+ sBuffer.append("\"acceptPublish\":false");
+ }
+ if (isAcceptSubscribe) {
+ sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
+ if (topicInfo.isAcceptSubscribe()) {
+ isSrvAcceptSubscribe = true;
+ }
+ } else {
+ sBuffer.append(",\"acceptSubscribe\":false");
+ }
+ totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+ sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
+ .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+ .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
+ }
+ sBuffer.append("}}");
+ }
+ sBuffer.append("],\"infoCount\":").append(brokerCount)
+ .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
+ .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
+ .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
+ .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount)
+ .append(",\"authData\":[");
+ if (enableAuthCtrl) {
+ sBuffer.append("\"enableAuthControl\":").append(enableAuthCtrl)
+ .append(",\"createUser\":\"").append(ctrlEntity.getModifyUser())
+ .append("\",\"createDate\":\"").append(ctrlEntity.getModifyDateStr())
+ .append("\",\"authConsumeGroup\":[");
+ List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
+ metaDataManager.getConsumeCtrlByTopic(entry.getKey());
+ int itemCount = 0;
+ for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
+ if (itemCount++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"groupName\":\"").append(groupEntity.getGroupName())
+ .append("\",\"createUser\":\"").append(groupEntity.getCreateUser())
+ .append("\",\"createDate\":\"").append(groupEntity.getCreateDateStr())
+ .append("\",\"modifyUser\":\"").append(groupEntity.getModifyUser())
+ .append("\",\"modifyDate\":\"").append(groupEntity.getModifyDateStr())
+ .append("\"}");
+ }
+ sBuffer.append("],\"groupCount\":").append(itemCount).append(",\"authFilterCondSet\":[");
+ itemCount = 0;
+ int condStatusId = 1;
+ for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
+ if (itemCount++ > 0) {
+ sBuffer.append(",");
+ }
+ condStatusId = 0;
+ if (groupEntity.getFilterEnable().isEnable()) {
+ condStatusId = 2;
+ }
+ sBuffer.append("{\"groupName\":\"").append(groupEntity.getGroupName())
+ .append("\",\"condStatus\":").append(condStatusId);
+ if (!groupEntity.getFilterEnable().isEnable()) {
+ sBuffer.append(",\"filterConds\":\"\"");
+ } else {
+ sBuffer.append(",\"filterConds\":\"")
+ .append(groupEntity.getFilterCondStr()).append("\"");
+ }
+ sBuffer.append(",\"createDate\":\"").append(groupEntity.getCreateDateStr())
+ .append("\",\"modifyUser\":\"").append(groupEntity.getModifyUser())
+ .append("\",\"modifyDate\":\"").append(groupEntity.getModifyDateStr())
+ .append("\"}");
+ }
+ sBuffer.append("],\"filterCount\":").append(itemCount);
+ }
+ sBuffer.append("}}");
}
- Map<String, TopicDeployEntity> addRecordMap =
- (Map<String, TopicDeployEntity>) result.getRetData();
- List<TopicProcessResult> retInfo = new ArrayList<>();
- for (TopicDeployEntity topicDeployInfo : addRecordMap.values()) {
- retInfo.add(metaDataManager.addTopicDeployInfo(topicDeployInfo, sBuffer, result));
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ private StringBuilder buildNewQueryResult(boolean withAuthInfo,
+ StringBuilder sBuffer,
+ Map<String, List<TopicDeployEntity>> topicDeployMap) {
+ // build query result
+ int totalCnt = 0;
+ int totalCfgNumPartCount = 0;
+ int totalRunNumPartCount = 0;
+ boolean isSrvAcceptPublish = false;
+ boolean isSrvAcceptSubscribe = false;
+ boolean isAcceptPublish = false;
+ boolean isAcceptSubscribe = false;
+ ManageStatus manageStatus;
+ Tuple2<Boolean, Boolean> pubSubStatus;
+ TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployMap.entrySet()) {
+ totalCfgNumPartCount = 0;
+ totalRunNumPartCount = 0;
+ isSrvAcceptPublish = false;
+ isSrvAcceptSubscribe = false;
+ isAcceptPublish = false;
+ isAcceptSubscribe = false;
+ TopicCtrlEntity ctrlEntity =
+ metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+ ctrlEntity.toWebJsonStr(sBuffer, true, false);
+ sBuffer.append(",\"deployInfo\":[");
+ int brokerCount = 0;
+ for (TopicDeployEntity entity : entry.getValue()) {
+ if (brokerCount++ > 0) {
+ sBuffer.append(",");
+ }
+ totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
+ entity.toWebJsonStr(sBuffer, true, false);
+ sBuffer.append("\",\"runInfo\":{");
+ BrokerConfEntity brokerConfEntity =
+ metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
+ String strManageStatus = "-";
+ if (brokerConfEntity != null) {
+ manageStatus = brokerConfEntity.getManageStatus();
+ strManageStatus = manageStatus.getDescription();
+ pubSubStatus = manageStatus.getPubSubStatus();
+ isAcceptPublish = pubSubStatus.getF0();
+ isAcceptSubscribe = pubSubStatus.getF1();
+ }
+ BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
+ entity.getBrokerIp(), entity.getBrokerPort());
+ TopicInfo topicInfo =
+ topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+ if (topicInfo == null) {
+ sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
+ .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
+ } else {
+ if (isAcceptPublish) {
+ sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
+ if (topicInfo.isAcceptPublish()) {
+ isSrvAcceptPublish = true;
+ }
+ } else {
+ sBuffer.append("\"acceptPublish\":false");
+ }
+ if (isAcceptSubscribe) {
+ sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
+ if (topicInfo.isAcceptSubscribe()) {
+ isSrvAcceptSubscribe = true;
+ }
+ } else {
+ sBuffer.append(",\"acceptSubscribe\":false");
+ }
+ totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+ sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
+ .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+ .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
+ }
+ sBuffer.append("}}");
+ }
+ sBuffer.append("],\"infoCount\":").append(brokerCount)
+ .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
+ .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
+ .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
+ .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount);
+ if (withAuthInfo) {
+ sBuffer.append(",\"groupAuthInfo\":[");
+ List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
+ metaDataManager.getConsumeCtrlByTopic(entry.getKey());
+ int countJ = 0;
+ for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
+ if (countJ++ > 0) {
+ sBuffer.append(",");
+ }
+ groupEntity.toWebJsonStr(sBuffer, true, false);
+ }
+ sBuffer.append("],\"groupCount\":").append(countJ);
+ }
+ sBuffer.append("}");
}
- return buildRetInfo(retInfo, sBuffer);
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
- /**
- * Modify topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminModifyTopicEntityInfo(HttpServletRequest req) {
+ private StringBuilder innAddOrUpdTopicDeployInfo(HttpServletRequest req, boolean isAddOp) {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
@@ -549,7 +708,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
return sBuffer;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
@@ -573,45 +732,26 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
- // modify records
- List<TopicProcessResult> retInfo =
- metaDataManager.modTopicConfig(opEntity, brokerIdSet,
- topicNameSet, topicProps, sBuffer, result);
+ TopicPropGroup topicPropInfo = (TopicPropGroup) result.getRetData();
+ // set valid topicStatus info
+ TopicStatus topicStatus = TopicStatus.STATUS_TOPIC_UNDEFINED;
+ if (isAddOp) {
+ topicStatus = TopicStatus.STATUS_TOPIC_OK;
+ }
+ // add record
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (String topicName : topicNameSet) {
+ for (Integer brokerId : brokerIdSet) {
+ retInfo.add(metaDataManager.addOrUpdTopicDeployInfo(isAddOp,
+ opEntity, brokerId, topicName, topicStatus,
+ topicPropInfo, sBuffer, result));
+ }
+ }
return buildRetInfo(retInfo, sBuffer);
}
- /**
- * Delete topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminDeleteTopicEntityInfo(HttpServletRequest req) {
- return innModifyTopicStatusEntityInfo(req, TopicStatus.STATUS_TOPIC_SOFT_DELETE);
- }
-
- /**
- * Remove topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminRemoveTopicEntityInfo(HttpServletRequest req) {
- return innModifyTopicStatusEntityInfo(req, TopicStatus.STATUS_TOPIC_SOFT_REMOVE);
- }
-
- /**
- * Redo delete topic info
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminRedoDeleteTopicEntityInfo(HttpServletRequest req) {
+ private StringBuilder innBatchAddOrUpdTopicDeployInfo(HttpServletRequest req,
+ boolean isAddOp) {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
@@ -621,55 +761,43 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
return sBuffer;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- BaseEntity opEntity = (BaseEntity) result.getRetData();
- // check and get topicName info
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
+ // check and get add record map
+ if (!getTopicDeployJsonSetInfo(req, true, defOpEntity, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
- // check and get brokerId info
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
+ Map<String, TopicDeployEntity> addRecordMap =
+ (Map<String, TopicDeployEntity>) result.getRetData();
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (TopicDeployEntity topicDeployInfo : addRecordMap.values()) {
+ retInfo.add(metaDataManager.addOrUpdTopicDeployInfo(isAddOp,
+ topicDeployInfo, sBuffer, result));
}
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
- // modify records
- List<TopicProcessResult> retInfo =
- metaDataManager.modRedoDelTopicConf(opEntity, brokerIdSet,
- topicNameSet, sBuffer, result);
return buildRetInfo(retInfo, sBuffer);
}
-
-
- private boolean getTopicDeployJsonSetInfo(HttpServletRequest req, boolean isAdd,
- BaseEntity defOpEntity,
- List<Map<String, String>> defValue,
- StringBuilder sBuffer,
+ private boolean getTopicDeployJsonSetInfo(HttpServletRequest req, boolean isAddOp,
+ BaseEntity defOpEntity, StringBuilder sBuffer,
ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
- WebFieldDef.TOPICJSONSET, true, defValue, result)) {
+ WebFieldDef.TOPICJSONSET, true, null, result)) {
return result.success;
}
List<Map<String, String>> deployJsonArray =
(List<Map<String, String>>) result.retData1;
- // check and get cluster default setting info
- ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting(false);
TopicDeployEntity itemConf;
Map<String, TopicDeployEntity> addRecordMap = new HashMap<>();
// check and get topic deployment configure
for (int j = 0; j < deployJsonArray.size(); j++) {
Map<String, String> confMap = deployJsonArray.get(j);
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(confMap, isAdd, defOpEntity, sBuffer, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(confMap,
+ isAddOp, defOpEntity, sBuffer, result)) {
return result.isSuccess();
}
BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
@@ -687,21 +815,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
(BrokerConfEntity) result.getRetData();
// get and valid TopicPropGroup info
if (!WebParameterUtils.getTopicPropInfo(confMap,
- defClusterSetting.getClsDefTopicProps(), sBuffer, result)) {
+ (isAddOp ? brokerConf.getTopicProps() : null), sBuffer, result)) {
return result.isSuccess();
}
TopicPropGroup topicPropInfo = (TopicPropGroup) result.getRetData();
- /* check max message size
- if (!WebParameterUtils.getIntParamValue(confMap,
- WebFieldDef.MAXMSGSIZEINMB, false,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
- result)) {
- return result.isSuccess();
- }
- int inMaxMsgSizeMB = (int) result.getRetData();
- */
// get topicNameId field
int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
TopicCtrlEntity topicCtrlEntity =
@@ -709,11 +826,16 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
if (topicCtrlEntity != null) {
topicNameId = topicCtrlEntity.getTopicId();
}
- itemConf = new TopicDeployEntity(itemOpEntity, brokerConf.getBrokerId(),
- brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
+ // set valid topicStatus info
+ TopicStatus topicStatus = TopicStatus.STATUS_TOPIC_UNDEFINED;
+ if (isAddOp) {
+ topicStatus = TopicStatus.STATUS_TOPIC_OK;
+ }
+ itemConf = new TopicDeployEntity(itemOpEntity,
+ brokerConf.getBrokerId(), topicName);
itemConf.updModifyInfo(itemOpEntity.getDataVerId(), topicNameId,
- TBaseConstants.META_VALUE_UNDEFINED, null,
- TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
+ brokerConf.getBrokerPort(), brokerConf.getBrokerIp(),
+ topicStatus, topicPropInfo);
addRecordMap.put(itemConf.getRecordKey(), itemConf);
}
// check result
@@ -751,34 +873,33 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
private StringBuilder buildRetInfo(List<TopicProcessResult> retInfo,
- StringBuilder sBuilder) {
+ StringBuilder sBuffer) {
int totalCnt = 0;
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (TopicProcessResult entry : retInfo) {
if (totalCnt++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- sBuilder.append("{\"brokerId\":").append(entry.getBrokerId())
+ sBuffer.append("{\"brokerId\":").append(entry.getBrokerId())
.append(",\"topicName\":\"").append(entry.getTopicName()).append("\"")
.append(",\"success\":").append(entry.isSuccess())
.append(",\"errCode\":").append(entry.getErrCode())
.append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
/**
- * Internal method to perform deletion and removal of topic
+ * Internal method to perform topic deploy status change
*
* @param req
- * @param topicStatus
+ * @param chgType
* @return
*/
- // #lizard forgives
- private StringBuilder innModifyTopicStatusEntityInfo(HttpServletRequest req,
- TopicStatus topicStatus) {
+ private StringBuilder innModifyTopicDeployStatusInfo(HttpServletRequest req,
+ TopicStsChgType chgType) {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
@@ -807,10 +928,14 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
return sBuffer;
}
Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
- // modify records
- List<TopicProcessResult> retInfo =
- metaDataManager.modDelOrRmvTopicConf(opEntity, brokerIdSet,
- topicNameSet, topicStatus, sBuffer, result);
+ // modify record status
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (Integer brokerId : brokerIdSet) {
+ for (String topicName : topicNameSet) {
+ retInfo.add(metaDataManager.updTopicDeployStatusInfo(opEntity,
+ brokerId, topicName, chgType, sBuffer, result));
+ }
+ }
return buildRetInfo(retInfo, sBuffer);
}