You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/22 11:44:48 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-470] Add query
API of TopicName and BrokerId collection
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 1191375 [TUBEMQ-470] Add query API of TopicName and BrokerId collection
1191375 is described below
commit 1191375c120f72908df4c8c10b833091bea72aa1
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Dec 22 18:59:39 2020 +0800
[TUBEMQ-470] Add query API of TopicName and BrokerId collection
---
.../tubemq/server/common/fielddef/WebFieldDef.java | 10 ++-
.../server/common/utils/WebParameterUtils.java | 53 ++++++++++-
.../nodemanage/nodebroker/BrokerConfManager.java | 69 ++++++++++++++
.../web/handler/WebBrokerTopicConfHandler.java | 100 +++++++++++++++++++++
4 files changed, 226 insertions(+), 6 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 44a6f81..f73959e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -64,9 +64,13 @@ public enum WebFieldDef {
COMPSPARTITIONID(12, "partitionId", "pid", WebFieldType.COMPINT,
"Partition id", RegexDef.TMP_NUMBER),
CALLERIP(13, "callerIp", "cip", WebFieldType.STRING,
- "Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH);
-
-
+ "Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
+ BROKERID(14, "brokerId", "brokerId", WebFieldType.INT,
+ "Broker ID", RegexDef.TMP_NUMBER),
+ COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT,
+ "Broker ID", RegexDef.TMP_NUMBER),
+ WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN,
+ "Require return ip information, default is false");
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index b7d5d41..1202d33 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -283,6 +283,51 @@ public class WebParameterUtils {
* @param req Http Servlet Request
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is must required
+ * @return valid result for the parameter value
+ */
+ public static ProcessResult getIntParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required) {
+ ProcessResult procResult =
+ getStringParamValue(req, fieldDef, required, null);
+ if (!procResult.success) {
+ return procResult;
+ }
+ ProcessResult procRet = new ProcessResult();
+ Set<Integer> tgtValueSet = new HashSet<Integer>();
+ if (fieldDef.isCompFieldType()) {
+ Set<String> valItemSet = (Set<String>) procResult.retData1;
+ if (valItemSet.isEmpty()) {
+ procResult.setSuccResult(tgtValueSet);
+ return procResult;
+ }
+ for (String itemVal : valItemSet) {
+ if (!checkIntValueNorms(procRet, fieldDef, itemVal, false, -1)) {
+ return procRet;
+ }
+ tgtValueSet.add((Integer) procRet.retData1);
+ }
+ } else {
+ String paramValue = (String) procResult.retData1;
+ if (paramValue == null) {
+ procResult.setSuccResult(tgtValueSet);
+ return procResult;
+ }
+ if (!checkIntValueNorms(procRet,
+ fieldDef, paramValue, false, -1)) {
+ tgtValueSet.add((Integer) procRet.retData1);
+ }
+ }
+ procResult.setSuccResult(tgtValueSet);
+ return procResult;
+ }
+
+ /**
+ * Parse the parameter value from an object value to a integer value
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is must required
* @param defValue a default value returned if failed to parse value from the given object
* @param minValue min value required
* @return valid result for the parameter value
@@ -307,7 +352,7 @@ public class WebParameterUtils {
}
ProcessResult procRet = new ProcessResult();
for (String itemVal : valItemSet) {
- if (!checkIntValueNorms(procRet, fieldDef, itemVal, minValue)) {
+ if (!checkIntValueNorms(procRet, fieldDef, itemVal, true, minValue)) {
return procRet;
}
tgtValueSet.add((Integer) procRet.retData1);
@@ -319,7 +364,7 @@ public class WebParameterUtils {
procResult.setSuccResult(defValue);
return procResult;
}
- checkIntValueNorms(procResult, fieldDef, paramValue, minValue);
+ checkIntValueNorms(procResult, fieldDef, paramValue, true, minValue);
}
return procResult;
}
@@ -501,16 +546,18 @@ public class WebParameterUtils {
* @param procResult process result
* @param fieldDef the parameter field definition
* @param paramValue the parameter value
+ * @param hasMinVal whether there is a minimum
* param minValue the parameter min value
* @return check result for string value of parameter
*/
private static boolean checkIntValueNorms(ProcessResult procResult,
WebFieldDef fieldDef,
String paramValue,
+ boolean hasMinVal,
int minValue) {
try {
int paramIntVal = Integer.parseInt(paramValue);
- if (paramIntVal < minValue) {
+ if (hasMinVal && paramIntVal < minValue) {
procResult.setFailResult(400,
new StringBuilder(512).append("Parameter ")
.append(fieldDef.name).append(" value must >= ")
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 61e1fe3..698f0d3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -721,6 +721,75 @@ public class BrokerConfManager implements Server {
return true;
}
+ /**
+ * get broker's topicName set,
+ * if brokerIds is empty, then return all broker's topicNames
+ *
+ * @param brokerIdSet
+ * @return broker's topicName set
+ */
+ public Map<Integer, Set<String>> getBrokerTopicConfigInfo(Set<Integer> brokerIdSet) {
+ Map<Integer, Set<String>> result = new HashMap<>();
+ if (brokerIdSet.isEmpty()) {
+ for (ConcurrentHashMap.Entry<Integer,
+ ConcurrentHashMap<String, BdbTopicConfEntity>>
+ entry : brokerTopicEntityStoreMap.entrySet()) {
+ Set<String> topicSet = new HashSet<>();
+ if (entry.getValue() != null) {
+ topicSet.addAll(entry.getValue().keySet());
+ }
+ result.put(entry.getKey(), topicSet);
+ }
+ } else {
+ for (Integer brokerId : brokerIdSet) {
+ ConcurrentHashMap<String, BdbTopicConfEntity> topicConfigMap =
+ brokerTopicEntityStoreMap.get(brokerId);
+ Set<String> topicSet = new HashSet<>();
+ if (topicConfigMap != null && !topicConfigMap.isEmpty()) {
+ topicSet.addAll(topicConfigMap.keySet());
+ }
+ result.put(brokerId, topicSet);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * get topic's brokerId set,
+ * if topicSet is empty, then return all topic's brokerIds
+ *
+ * @param topicNameSet
+ * @return topic's brokerId set
+ */
+ public Map<String, Map<Integer, String>> getTopicBrokerConfigInfo(Set<String> topicNameSet) {
+ Map<String, Map<Integer, String>> result = new HashMap<>();
+ if (topicNameSet.isEmpty()) {
+ for (Map<String, BdbTopicConfEntity> topicConfigMap
+ : brokerTopicEntityStoreMap.values()) {
+ for (Map.Entry<String, BdbTopicConfEntity> entry
+ : topicConfigMap.entrySet()) {
+ Map<Integer, String> brokerInfos =
+ result.computeIfAbsent(entry.getKey(), k -> new HashMap<>());
+ brokerInfos.put(entry.getValue().getBrokerId(),
+ entry.getValue().getBrokerIp());
+ }
+ }
+ } else {
+ for (Map<String, BdbTopicConfEntity> topicConfigMap
+ : brokerTopicEntityStoreMap.values()) {
+ for (String topic : topicNameSet) {
+ Map<Integer, String> brokerInfos =
+ result.computeIfAbsent(topic, k -> new HashMap<>());
+ BdbTopicConfEntity topicConfig = topicConfigMap.get(topic);
+ if (topicConfig != null) {
+ brokerInfos.put(topicConfig.getBrokerId(), topicConfig.getBrokerIp());
+ }
+ }
+ }
+ }
+ return result;
+ }
+
public Set<String> getTotalConfiguredTopicNames() {
Set<String> totalTopics = new HashSet<>(50);
for (ConcurrentHashMap<String, BdbTopicConfEntity> tmpTopicCfgMap
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index bc14775..3bdfe16 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -33,6 +33,8 @@ import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -66,6 +68,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
"adminQueryTopicCfgEntityAndRunInfo");
registerQueryWebMethod("admin_query_broker_topic_config_info",
"adminQueryBrokerTopicCfgAndRunInfo");
+ registerQueryWebMethod("admin_query_topicName",
+ "adminQuerySimpleTopicName");
+ registerQueryWebMethod("admin_query_brokerId",
+ "adminQuerySimpleBrokerId");
// register modify method
registerModifyWebMethod("admin_add_new_topic_record",
"adminAddTopicEntityInfo");
@@ -702,6 +708,100 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
}
/**
+ * Query broker's topic-name set info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
+ StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
+ return strBuffer;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+ Map<Integer, Set<String>> brokerTopicConfigMap =
+ brokerConfManager.getBrokerTopicConfigInfo(brokerIds);
+ int dataCount = 0;
+ for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
+ if (dataCount++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
+ int topicCnt = 0;
+ Set<String> topicSet = entry.getValue();
+ for (String topic : topicSet) {
+ if (topicCnt++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append("\"").append(topic).append("\"");
+ }
+ strBuffer.append("],\"topicCount\":").append(topicCnt).append("}");
+ }
+ strBuffer.append("],\"dataCount\":").append(dataCount).append("}");
+ return strBuffer;
+ }
+
+ /**
+ * Query topic's broker id set
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
+ StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
+ return strBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ result = WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.WITHIP, false, false);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
+ return strBuffer;
+ }
+ boolean withIp = (Boolean) result.retData1;
+ // return result;
+ strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+ Map<String, Map<Integer, String>> opicBrokerConfigMap =
+ brokerConfManager.getTopicBrokerConfigInfo(topicNameSet);
+ int dataCount = 0;
+ for (Map.Entry<String, Map<Integer, String>> entry : opicBrokerConfigMap.entrySet()) {
+ if (dataCount++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append("{\"topicName\":\"").append(entry.getKey()).append("\",\"brokerInfo\":[");
+ int topicCnt = 0;
+ Map<Integer, String> brokerMap = entry.getValue();
+ if (withIp) {
+ for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+ if (topicCnt++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append("{\"brokerId\":").append(entry1.getKey())
+ .append(",\"brokerIp\":\"").append(entry1.getValue()).append("\"}");
+ }
+ } else {
+ for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+ if (topicCnt++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append(entry1.getKey());
+ }
+ }
+ strBuffer.append("],\"brokerCnt\":").append(topicCnt).append("}");
+ }
+ strBuffer.append("],\"dataCount\":").append(dataCount).append("}");
+ return strBuffer;
+ }
+
+ /**
* Delete topic info
*
* @param req