You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:40 UTC
[incubator-tubemq] 42/49: [TUBEMQ-515]Add cluster Topic view web api
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 4b138de7fef71c68b9fc35f7996890cb127c2129
Author: gosonzhang <go...@tencent.com>
AuthorDate: Sat Jan 16 17:47:22 2021 +0800
[TUBEMQ-515]Add cluster Topic view web api
---
resources/assets/scripts/topicList.js | 8 +-
.../server/common/utils/WebParameterUtils.java | 19 ++++
.../web/handler/WebBrokerTopicConfHandler.java | 33 ++----
.../master/web/handler/WebMasterInfoHandler.java | 115 ++++++++++++++++++++-
4 files changed, 146 insertions(+), 29 deletions(-)
diff --git a/resources/assets/scripts/topicList.js b/resources/assets/scripts/topicList.js
index b755a94..7dc000a 100644
--- a/resources/assets/scripts/topicList.js
+++ b/resources/assets/scripts/topicList.js
@@ -102,7 +102,7 @@
'false': '否',
'-': '-'
};
- var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_topic_info&" + $.param(
+ var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_cluster_topic_view&" + $.param(
opts);
if (!this.$topicListDataTable) {
@@ -126,7 +126,7 @@
return html;
}
}, {
- "data": "infoCount"
+ "data": "totalCfgBrokerCnt"
}, {
"data": "totalCfgNumPart"
}, {
@@ -166,13 +166,13 @@
+ '"><input type="checkbox" checked></span>';
}
}, {
- "data": "authData",
+ "data": "enableAuthControl",
"orderable": false,
"render": function (data,
type,
full,
meta) {
- var checked = data.enableAuthControl
+ var checked = data
=== true
? ' checked'
: '';
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 27e61e1..161966d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -34,6 +34,7 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
@@ -1401,6 +1402,24 @@ public class WebParameterUtils {
return strManageStatus;
}
+ public static Tuple2<Boolean, Boolean> getPubSubStatusByManageStatus(int manageStatus) {
+ boolean isAcceptPublish = false;
+ boolean isAcceptSubscribe = false;
+ if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) {
+ if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
+ isAcceptPublish = true;
+ isAcceptSubscribe = true;
+ } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
+ isAcceptPublish = false;
+ isAcceptSubscribe = true;
+ } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
+ isAcceptPublish = true;
+ isAcceptSubscribe = false;
+ }
+ }
+ return new Tuple2<>(isAcceptPublish, isAcceptSubscribe);
+ }
+
public static String date2yyyyMMddHHmmss(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
return sdf.format(date);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 9a4cf04..74a4ed0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -33,6 +33,7 @@ import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.SettingValidUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
@@ -628,18 +629,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
if (brokerConfEntity != null) {
int manageStatus = brokerConfEntity.getManageStatus();
strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus);
- if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) {
- if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
- isAcceptPublish = true;
- isAcceptSubscribe = true;
- } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
- isAcceptPublish = false;
- isAcceptSubscribe = true;
- } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
- isAcceptPublish = true;
- isAcceptSubscribe = false;
- }
- }
+ Tuple2<Boolean, Boolean> pubSubStatus =
+ WebParameterUtils.getPubSubStatusByManageStatus(manageStatus);
+ isAcceptPublish = pubSubStatus.getF0();
+ isAcceptSubscribe = pubSubStatus.getF1();
}
BrokerInfo broker =
new BrokerInfo(entity.getBrokerId(), entity.getBrokerIp(), entity.getBrokerPort());
@@ -1243,18 +1236,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
if (brokerConfEntity != null) {
int manageStatus = brokerConfEntity.getManageStatus();
strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus);
- if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) {
- if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
- isAcceptPublish = true;
- isAcceptSubscribe = true;
- } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
- isAcceptPublish = false;
- isAcceptSubscribe = true;
- } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
- isAcceptPublish = true;
- isAcceptSubscribe = false;
- }
- }
+ Tuple2<Boolean, Boolean> pubSubStatus =
+ WebParameterUtils.getPubSubStatusByManageStatus(manageStatus);
+ isAcceptPublish = pubSubStatus.getF0();
+ isAcceptSubscribe = pubSubStatus.getF1();
}
BrokerSyncStatusInfo brokerSyncStatusInfo =
brokerSyncStatusInfoMap.get(brokerEntity.getBrokerId());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index f2fece8..0e58fbb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -19,14 +19,25 @@ package org.apache.tubemq.server.master.web.handler;
import java.util.Date;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.SettingValidUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -51,6 +62,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
"getGroupAddressStrInfo");
registerQueryWebMethod("admin_query_cluster_default_setting",
"adminQueryClusterDefSetting");
+ registerQueryWebMethod("admin_query_cluster_topic_view",
+ "adminQueryClusterTopicView");
+
// register modify method
registerModifyWebMethod("admin_transfer_current_master",
"transferCurrentMaster");
@@ -210,7 +224,106 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
return sBuilder;
}
-
+ /**
+ * Query cluster topic overall view
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryClusterTopicView(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // query topic configure info
+ ConcurrentHashMap<String, List<BdbTopicConfEntity>> topicConfigMap =
+ brokerConfManager.getBdbTopicEntityMap(null);
+ TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ int totalCount = 0;
+ int brokerCount = 0;
+ int totalCfgNumPartCount = 0;
+ int totalRunNumPartCount = 0;
+ boolean isSrvAcceptPublish = false;
+ boolean isSrvAcceptSubscribe = false;
+ boolean isAcceptPublish = false;
+ boolean isAcceptSubscribe = false;
+ boolean enableAuthControl = false;
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+ for (Map.Entry<String, List<BdbTopicConfEntity>> entry : topicConfigMap.entrySet()) {
+ if (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey())) {
+ continue;
+ }
+ if (totalCount++ > 0) {
+ sBuilder.append(",");
+ }
+ brokerCount = 0;
+ totalCfgNumPartCount = 0;
+ totalRunNumPartCount = 0;
+ isSrvAcceptPublish = false;
+ isSrvAcceptSubscribe = false;
+ enableAuthControl = false;
+ isAcceptPublish = false;
+ isAcceptSubscribe = false;
+ for (BdbTopicConfEntity entity : entry.getValue()) {
+ if ((!brokerIds.isEmpty()) && (!brokerIds.contains(entity.getBrokerId()))) {
+ continue;
+ }
+ brokerCount++;
+ totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
+ BdbBrokerConfEntity brokerConfEntity =
+ brokerConfManager.getBrokerDefaultConfigStoreInfo(entity.getBrokerId());
+ if (brokerConfEntity != null) {
+ Tuple2<Boolean, Boolean> pubSubStatus =
+ WebParameterUtils.getPubSubStatusByManageStatus(
+ brokerConfEntity.getManageStatus());
+ isAcceptPublish = pubSubStatus.getF0();
+ isAcceptSubscribe = pubSubStatus.getF1();
+ }
+ BrokerInfo broker =
+ new BrokerInfo(entity.getBrokerId(),
+ entity.getBrokerIp(), entity.getBrokerPort());
+ TopicInfo topicInfo = topicPSInfoManager.getTopicInfo(
+ entity.getTopicName(), broker);
+ if (topicInfo != null) {
+ if (isAcceptPublish && topicInfo.isAcceptPublish()) {
+ isSrvAcceptPublish = true;
+ }
+ if (isAcceptSubscribe && topicInfo.isAcceptSubscribe()) {
+ isSrvAcceptSubscribe = true;
+ }
+ totalRunNumPartCount +=
+ topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+ }
+ }
+ BdbTopicAuthControlEntity authEntity =
+ brokerConfManager.getBdbEnableAuthControlByTopicName(entry.getKey());
+ if (authEntity != null) {
+ enableAuthControl = authEntity.isEnableAuthControl();
+ }
+ sBuilder.append("{\"topicName\":\"").append(entry.getKey())
+ .append("\",\"totalCfgBrokerCnt\":").append(brokerCount)
+ .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
+ .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount)
+ .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
+ .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
+ .append(",\"enableAuthControl\":").append(enableAuthControl)
+ .append("}");
+ }
+ sBuilder.append("],\"dataCount\":").append(totalCount).append("}");
+ return sBuilder;
+ }
}