You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/03/01 09:07:20 UTC

[incubator-tubemq] 06/29: [TUBEMQ-515]Add cluster Topic view web api

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-469
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit f25db3019f374189b5012534843d5166d9733258
Author: gosonzhang <go...@tencent.com>
AuthorDate: Sat Jan 16 17:47:22 2021 +0800

    [TUBEMQ-515]Add cluster Topic view web api
---
 resources/assets/scripts/topicList.js              |   8 +-
 .../server/common/utils/WebParameterUtils.java     |  19 ++++
 .../web/handler/WebBrokerTopicConfHandler.java     |  33 ++----
 .../master/web/handler/WebMasterInfoHandler.java   | 115 ++++++++++++++++++++-
 4 files changed, 146 insertions(+), 29 deletions(-)

diff --git a/resources/assets/scripts/topicList.js b/resources/assets/scripts/topicList.js
index b755a94..7dc000a 100644
--- a/resources/assets/scripts/topicList.js
+++ b/resources/assets/scripts/topicList.js
@@ -102,7 +102,7 @@
                 'false': '否',
                 '-': '-'
             };
-            var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_topic_info&" + $.param(
+            var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_cluster_topic_view&" + $.param(
                     opts);
 
             if (!this.$topicListDataTable) {
@@ -126,7 +126,7 @@
                             return html;
                         }
                     }, {
-                        "data": "infoCount"
+                        "data": "totalCfgBrokerCnt"
                     }, {
                         "data": "totalCfgNumPart"
                     }, {
@@ -166,13 +166,13 @@
                                 + '"><input type="checkbox" checked></span>';
                         }
                     }, {
-                        "data": "authData",
+                        "data": "enableAuthControl",
                         "orderable": false,
                         "render": function (data,
                                             type,
                                             full,
                                             meta) {
-                            var checked = data.enableAuthControl
+                            var checked = data
                             === true
                                 ? ' checked'
                                 : '';
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 27e61e1..161966d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -34,6 +34,7 @@ import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.TStatusConstants;
@@ -1401,6 +1402,24 @@ public class WebParameterUtils {
         return strManageStatus;
     }
 
+    public static Tuple2<Boolean, Boolean> getPubSubStatusByManageStatus(int manageStatus) {
+        boolean isAcceptPublish = false;
+        boolean isAcceptSubscribe = false;
+        if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) {
+            if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
+                isAcceptPublish = true;
+                isAcceptSubscribe = true;
+            } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
+                isAcceptPublish = false;
+                isAcceptSubscribe = true;
+            } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
+                isAcceptPublish = true;
+                isAcceptSubscribe = false;
+            }
+        }
+        return new Tuple2<>(isAcceptPublish, isAcceptSubscribe);
+    }
+
     public static String date2yyyyMMddHHmmss(Date date) {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
         return sdf.format(date);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 9a4cf04..74a4ed0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -33,6 +33,7 @@ import org.apache.tubemq.corebase.cluster.BrokerInfo;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.SettingValidUtils;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.TStatusConstants;
 import org.apache.tubemq.server.common.fielddef.WebFieldDef;
@@ -628,18 +629,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
                     if (brokerConfEntity != null) {
                         int manageStatus = brokerConfEntity.getManageStatus();
                         strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus);
-                        if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) {
-                            if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
-                                isAcceptPublish = true;
-                                isAcceptSubscribe = true;
-                            } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
-                                isAcceptPublish = false;
-                                isAcceptSubscribe = true;
-                            } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
-                                isAcceptPublish = true;
-                                isAcceptSubscribe = false;
-                            }
-                        }
+                        Tuple2<Boolean, Boolean> pubSubStatus =
+                                WebParameterUtils.getPubSubStatusByManageStatus(manageStatus);
+                        isAcceptPublish = pubSubStatus.getF0();
+                        isAcceptSubscribe = pubSubStatus.getF1();
                     }
                     BrokerInfo broker =
                             new BrokerInfo(entity.getBrokerId(), entity.getBrokerIp(), entity.getBrokerPort());
@@ -1243,18 +1236,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
                 if (brokerConfEntity != null) {
                     int manageStatus = brokerConfEntity.getManageStatus();
                     strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus);
-                    if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) {
-                        if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
-                            isAcceptPublish = true;
-                            isAcceptSubscribe = true;
-                        } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
-                            isAcceptPublish = false;
-                            isAcceptSubscribe = true;
-                        } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
-                            isAcceptPublish = true;
-                            isAcceptSubscribe = false;
-                        }
-                    }
+                    Tuple2<Boolean, Boolean> pubSubStatus =
+                            WebParameterUtils.getPubSubStatusByManageStatus(manageStatus);
+                    isAcceptPublish = pubSubStatus.getF0();
+                    isAcceptSubscribe = pubSubStatus.getF1();
                 }
                 BrokerSyncStatusInfo brokerSyncStatusInfo =
                         brokerSyncStatusInfoMap.get(brokerEntity.getBrokerId());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index f2fece8..0e58fbb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -19,14 +19,25 @@ package org.apache.tubemq.server.master.web.handler;
 
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.SettingValidUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.WebParameterUtils;
 import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
 import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
 import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
 
@@ -51,6 +62,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
                 "getGroupAddressStrInfo");
         registerQueryWebMethod("admin_query_cluster_default_setting",
                 "adminQueryClusterDefSetting");
+        registerQueryWebMethod("admin_query_cluster_topic_view",
+                "adminQueryClusterTopicView");
+
         // register modify method
         registerModifyWebMethod("admin_transfer_current_master",
                 "transferCurrentMaster");
@@ -210,7 +224,106 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
         return sBuilder;
     }
 
-
+    /**
+     * Query cluster topic overall view
+     *
+     * @param req
+     * @return
+     */
+    public StringBuilder adminQueryClusterTopicView(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
+        StringBuilder sBuilder = new StringBuilder(512);
+        // check and get brokerId field
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+        // check and get topicName field
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return sBuilder;
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // query topic configure info
+        ConcurrentHashMap<String, List<BdbTopicConfEntity>> topicConfigMap =
+                brokerConfManager.getBdbTopicEntityMap(null);
+        TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+        int totalCount = 0;
+        int brokerCount = 0;
+        int totalCfgNumPartCount = 0;
+        int totalRunNumPartCount = 0;
+        boolean isSrvAcceptPublish = false;
+        boolean isSrvAcceptSubscribe = false;
+        boolean isAcceptPublish = false;
+        boolean isAcceptSubscribe = false;
+        boolean enableAuthControl = false;
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+        for (Map.Entry<String, List<BdbTopicConfEntity>> entry : topicConfigMap.entrySet()) {
+            if (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey())) {
+                continue;
+            }
+            if (totalCount++ > 0) {
+                sBuilder.append(",");
+            }
+            brokerCount = 0;
+            totalCfgNumPartCount = 0;
+            totalRunNumPartCount = 0;
+            isSrvAcceptPublish = false;
+            isSrvAcceptSubscribe = false;
+            enableAuthControl = false;
+            isAcceptPublish = false;
+            isAcceptSubscribe = false;
+            for (BdbTopicConfEntity entity : entry.getValue()) {
+                if ((!brokerIds.isEmpty()) && (!brokerIds.contains(entity.getBrokerId()))) {
+                    continue;
+                }
+                brokerCount++;
+                totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
+                BdbBrokerConfEntity brokerConfEntity =
+                        brokerConfManager.getBrokerDefaultConfigStoreInfo(entity.getBrokerId());
+                if (brokerConfEntity != null) {
+                    Tuple2<Boolean, Boolean> pubSubStatus =
+                            WebParameterUtils.getPubSubStatusByManageStatus(
+                                    brokerConfEntity.getManageStatus());
+                    isAcceptPublish = pubSubStatus.getF0();
+                    isAcceptSubscribe = pubSubStatus.getF1();
+                }
+                BrokerInfo broker =
+                        new BrokerInfo(entity.getBrokerId(),
+                                entity.getBrokerIp(), entity.getBrokerPort());
+                TopicInfo topicInfo = topicPSInfoManager.getTopicInfo(
+                        entity.getTopicName(), broker);
+                if (topicInfo != null) {
+                    if (isAcceptPublish && topicInfo.isAcceptPublish()) {
+                        isSrvAcceptPublish = true;
+                    }
+                    if (isAcceptSubscribe && topicInfo.isAcceptSubscribe()) {
+                        isSrvAcceptSubscribe = true;
+                    }
+                    totalRunNumPartCount +=
+                            topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+                }
+            }
+            BdbTopicAuthControlEntity authEntity =
+                    brokerConfManager.getBdbEnableAuthControlByTopicName(entry.getKey());
+            if (authEntity != null) {
+                enableAuthControl = authEntity.isEnableAuthControl();
+            }
+            sBuilder.append("{\"topicName\":\"").append(entry.getKey())
+                    .append("\",\"totalCfgBrokerCnt\":").append(brokerCount)
+                    .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
+                    .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount)
+                    .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
+                    .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
+                    .append(",\"enableAuthControl\":").append(enableAuthControl)
+                    .append("}");
+        }
+        sBuilder.append("],\"dataCount\":").append(totalCount).append("}");
+        return sBuilder;
+    }
 
 
 }