You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/22 11:44:48 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-470] Add query API of TopicName and BrokerId collection

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 1191375  [TUBEMQ-470] Add query API of TopicName and BrokerId collection
1191375 is described below

commit 1191375c120f72908df4c8c10b833091bea72aa1
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Dec 22 18:59:39 2020 +0800

    [TUBEMQ-470] Add query API of TopicName and BrokerId collection
---
 .../tubemq/server/common/fielddef/WebFieldDef.java |  10 ++-
 .../server/common/utils/WebParameterUtils.java     |  53 ++++++++++-
 .../nodemanage/nodebroker/BrokerConfManager.java   |  69 ++++++++++++++
 .../web/handler/WebBrokerTopicConfHandler.java     | 100 +++++++++++++++++++++
 4 files changed, 226 insertions(+), 6 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 44a6f81..f73959e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -64,9 +64,13 @@ public enum WebFieldDef {
     COMPSPARTITIONID(12, "partitionId", "pid", WebFieldType.COMPINT,
             "Partition id", RegexDef.TMP_NUMBER),
     CALLERIP(13, "callerIp", "cip", WebFieldType.STRING,
-            "Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH);
-
-
+            "Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
+    BROKERID(14, "brokerId", "brokerId", WebFieldType.INT,
+            "Broker ID", RegexDef.TMP_NUMBER),
+    COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT,
+            "Broker ID", RegexDef.TMP_NUMBER),
+    WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN,
+            "Require return ip information, default is false");
 
 
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index b7d5d41..1202d33 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -283,6 +283,51 @@ public class WebParameterUtils {
      * @param req        Http Servlet Request
      * @param fieldDef   the parameter field definition
      * @param required   a boolean value represent whether the parameter is must required
+     * @return valid result for the parameter value
+     */
+    public static ProcessResult getIntParamValue(HttpServletRequest req,
+                                                 WebFieldDef fieldDef,
+                                                 boolean required) {
+        ProcessResult procResult =
+                getStringParamValue(req, fieldDef, required, null);
+        if (!procResult.success) {
+            return procResult;
+        }
+        ProcessResult procRet = new ProcessResult();
+        Set<Integer> tgtValueSet = new HashSet<Integer>();
+        if (fieldDef.isCompFieldType()) {
+            Set<String> valItemSet = (Set<String>) procResult.retData1;
+            if (valItemSet.isEmpty()) {
+                procResult.setSuccResult(tgtValueSet);
+                return procResult;
+            }
+            for (String itemVal : valItemSet) {
+                if (!checkIntValueNorms(procRet, fieldDef, itemVal, false, -1)) {
+                    return procRet;
+                }
+                tgtValueSet.add((Integer) procRet.retData1);
+            }
+        } else {
+            String paramValue = (String) procResult.retData1;
+            if (paramValue == null) {
+                procResult.setSuccResult(tgtValueSet);
+                return procResult;
+            }
+            if (!checkIntValueNorms(procRet,
+                    fieldDef, paramValue, false, -1)) {
+                tgtValueSet.add((Integer) procRet.retData1);
+            }
+        }
+        procResult.setSuccResult(tgtValueSet);
+        return procResult;
+    }
+
+    /**
+     * Parse the parameter value from an object value to a integer value
+     *
+     * @param req        Http Servlet Request
+     * @param fieldDef   the parameter field definition
+     * @param required   a boolean value represent whether the parameter is must required
      * @param defValue   a default value returned if failed to parse value from the given object
      * @param minValue   min value required
      * @return valid result for the parameter value
@@ -307,7 +352,7 @@ public class WebParameterUtils {
             }
             ProcessResult procRet = new ProcessResult();
             for (String itemVal : valItemSet) {
-                if (!checkIntValueNorms(procRet, fieldDef, itemVal, minValue)) {
+                if (!checkIntValueNorms(procRet, fieldDef, itemVal, true, minValue)) {
                     return procRet;
                 }
                 tgtValueSet.add((Integer) procRet.retData1);
@@ -319,7 +364,7 @@ public class WebParameterUtils {
                 procResult.setSuccResult(defValue);
                 return procResult;
             }
-            checkIntValueNorms(procResult, fieldDef, paramValue, minValue);
+            checkIntValueNorms(procResult, fieldDef, paramValue, true, minValue);
         }
         return procResult;
     }
@@ -501,16 +546,18 @@ public class WebParameterUtils {
      * @param procResult   process result
      * @param fieldDef     the parameter field definition
      * @param paramValue   the parameter value
+     * @param hasMinVal    whether there is a minimum
      * param minValue      the parameter min value
      * @return check result for string value of parameter
      */
     private static boolean checkIntValueNorms(ProcessResult procResult,
                                               WebFieldDef fieldDef,
                                               String paramValue,
+                                              boolean hasMinVal,
                                               int minValue) {
         try {
             int paramIntVal = Integer.parseInt(paramValue);
-            if (paramIntVal < minValue) {
+            if (hasMinVal && paramIntVal < minValue) {
                 procResult.setFailResult(400,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name).append(" value must >= ")
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 61e1fe3..698f0d3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -721,6 +721,75 @@ public class BrokerConfManager implements Server {
         return true;
     }
 
+    /**
+     * get broker's topicName set,
+     * if brokerIds is empty, then return all broker's topicNames
+     *
+     * @param brokerIdSet
+     * @return broker's topicName set
+     */
+    public Map<Integer, Set<String>> getBrokerTopicConfigInfo(Set<Integer> brokerIdSet) {
+        Map<Integer, Set<String>> result = new HashMap<>();
+        if (brokerIdSet.isEmpty()) {
+            for (ConcurrentHashMap.Entry<Integer,
+                    ConcurrentHashMap<String, BdbTopicConfEntity>>
+                    entry : brokerTopicEntityStoreMap.entrySet()) {
+                Set<String> topicSet = new HashSet<>();
+                if (entry.getValue() != null) {
+                    topicSet.addAll(entry.getValue().keySet());
+                }
+                result.put(entry.getKey(), topicSet);
+            }
+        } else {
+            for (Integer brokerId : brokerIdSet) {
+                ConcurrentHashMap<String, BdbTopicConfEntity> topicConfigMap =
+                        brokerTopicEntityStoreMap.get(brokerId);
+                Set<String> topicSet = new HashSet<>();
+                if (topicConfigMap != null && !topicConfigMap.isEmpty()) {
+                    topicSet.addAll(topicConfigMap.keySet());
+                }
+                result.put(brokerId, topicSet);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * get topic's brokerId set,
+     * if topicSet is empty, then return all topic's brokerIds
+     *
+     * @param topicNameSet
+     * @return topic's brokerId set
+     */
+    public Map<String, Map<Integer, String>> getTopicBrokerConfigInfo(Set<String> topicNameSet) {
+        Map<String, Map<Integer, String>> result = new HashMap<>();
+        if (topicNameSet.isEmpty()) {
+            for (Map<String, BdbTopicConfEntity> topicConfigMap
+                    : brokerTopicEntityStoreMap.values()) {
+                for (Map.Entry<String, BdbTopicConfEntity> entry
+                        : topicConfigMap.entrySet()) {
+                    Map<Integer, String> brokerInfos =
+                            result.computeIfAbsent(entry.getKey(), k -> new HashMap<>());
+                    brokerInfos.put(entry.getValue().getBrokerId(),
+                            entry.getValue().getBrokerIp());
+                }
+            }
+        } else {
+            for (Map<String, BdbTopicConfEntity> topicConfigMap
+                    : brokerTopicEntityStoreMap.values()) {
+                for (String topic : topicNameSet) {
+                    Map<Integer, String> brokerInfos =
+                            result.computeIfAbsent(topic, k -> new HashMap<>());
+                    BdbTopicConfEntity topicConfig = topicConfigMap.get(topic);
+                    if (topicConfig != null) {
+                        brokerInfos.put(topicConfig.getBrokerId(), topicConfig.getBrokerIp());
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
     public Set<String> getTotalConfiguredTopicNames() {
         Set<String> totalTopics = new HashSet<>(50);
         for (ConcurrentHashMap<String, BdbTopicConfEntity> tmpTopicCfgMap
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index bc14775..3bdfe16 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -33,6 +33,8 @@ import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.TStatusConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.WebParameterUtils;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -66,6 +68,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
                 "adminQueryTopicCfgEntityAndRunInfo");
         registerQueryWebMethod("admin_query_broker_topic_config_info",
                 "adminQueryBrokerTopicCfgAndRunInfo");
+        registerQueryWebMethod("admin_query_topicName",
+                "adminQuerySimpleTopicName");
+        registerQueryWebMethod("admin_query_brokerId",
+                "adminQuerySimpleBrokerId");
         // register modify method
         registerModifyWebMethod("admin_add_new_topic_record",
                 "adminAddTopicEntityInfo");
@@ -702,6 +708,100 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
     }
 
     /**
+     * Query broker's topic-name set info
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
+        StringBuilder strBuffer = new StringBuilder(512);
+        ProcessResult result = WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
+            return strBuffer;
+        }
+        Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+        strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+        Map<Integer, Set<String>> brokerTopicConfigMap =
+                brokerConfManager.getBrokerTopicConfigInfo(brokerIds);
+        int dataCount = 0;
+        for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
+            if (dataCount++ > 0) {
+                strBuffer.append(",");
+            }
+            strBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
+            int topicCnt = 0;
+            Set<String> topicSet = entry.getValue();
+            for (String topic : topicSet) {
+                if (topicCnt++ > 0) {
+                    strBuffer.append(",");
+                }
+                strBuffer.append("\"").append(topic).append("\"");
+            }
+            strBuffer.append("],\"topicCount\":").append(topicCnt).append("}");
+        }
+        strBuffer.append("],\"dataCount\":").append(dataCount).append("}");
+        return strBuffer;
+    }
+
+    /**
+     * Query topic's broker id set
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
+        StringBuilder strBuffer = new StringBuilder(512);
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
+            return strBuffer;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        result = WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.WITHIP, false, false);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
+            return strBuffer;
+        }
+        boolean withIp = (Boolean) result.retData1;
+        // return result;
+        strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+        Map<String, Map<Integer, String>> opicBrokerConfigMap =
+                brokerConfManager.getTopicBrokerConfigInfo(topicNameSet);
+        int dataCount = 0;
+        for (Map.Entry<String, Map<Integer, String>> entry : opicBrokerConfigMap.entrySet()) {
+            if (dataCount++ > 0) {
+                strBuffer.append(",");
+            }
+            strBuffer.append("{\"topicName\":\"").append(entry.getKey()).append("\",\"brokerInfo\":[");
+            int topicCnt = 0;
+            Map<Integer, String> brokerMap = entry.getValue();
+            if (withIp) {
+                for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+                    if (topicCnt++ > 0) {
+                        strBuffer.append(",");
+                    }
+                    strBuffer.append("{\"brokerId\":").append(entry1.getKey())
+                            .append(",\"brokerIp\":\"").append(entry1.getValue()).append("\"}");
+                }
+            } else {
+                for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) {
+                    if (topicCnt++ > 0) {
+                        strBuffer.append(",");
+                    }
+                    strBuffer.append(entry1.getKey());
+                }
+            }
+            strBuffer.append("],\"brokerCnt\":").append(topicCnt).append("}");
+        }
+        strBuffer.append("],\"dataCount\":").append(dataCount).append("}");
+        return strBuffer;
+    }
+
+    /**
      * Delete topic info
      *
      * @param req