You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/25 12:26:24 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-484]Add query API
for topic publication information
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 8d1093a [TUBEMQ-484]Add query API for topic publication information
8d1093a is described below
commit 8d1093ad5ed231bd291c63a92a982294d2556f63
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Dec 25 18:48:58 2020 +0800
[TUBEMQ-484]Add query API for topic publication information
---
.../server/broker/utils/GroupOffsetInfo.java | 8 ++--
.../server/broker/utils/TopicPubStoreInfo.java | 28 ++++++++----
.../server/broker/web/BrokerAdminServlet.java | 51 +++++++++++++++++++++-
3 files changed, 72 insertions(+), 15 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
index 9a4abe3..a0c7215 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
@@ -39,10 +39,10 @@ public class GroupOffsetInfo {
public void setPartPubStoreInfo(TopicPubStoreInfo pubStoreInfo) {
if (pubStoreInfo != null) {
- this.offsetMin = pubStoreInfo.indexStart;
- this.offsetMax = pubStoreInfo.indexEnd;
- this.dataMin = pubStoreInfo.dataStart;
- this.dataMax = pubStoreInfo.dataEnd;
+ this.offsetMin = pubStoreInfo.offsetMin;
+ this.offsetMax = pubStoreInfo.offsetMax;
+ this.dataMin = pubStoreInfo.dataMin;
+ this.dataMax = pubStoreInfo.dataMax;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
index b2257dd..73b9d69 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
@@ -26,20 +26,30 @@ public class TopicPubStoreInfo {
public String topicName = null;
public int storeId = TBaseConstants.META_VALUE_UNDEFINED;
public int partitionId = TBaseConstants.META_VALUE_UNDEFINED;
- public long indexStart = 0L;
- public long indexEnd = 0L;
- public long dataStart = 0L;
- public long dataEnd = 0L;
+ public long offsetMin = 0L;
+ public long offsetMax = 0L;
+ public long dataMin = 0L;
+ public long dataMax = 0L;
public TopicPubStoreInfo(String topicName, int storeId, int partitionId,
- long indexStart, long indexEnd, long dataStart, long dataEnd) {
+ long offsetMin, long offsetMax, long dataMin, long dataMax) {
this.topicName = topicName;
this.storeId = storeId;
this.partitionId = partitionId;
- this.indexStart = indexStart;
- this.indexEnd = indexEnd;
- this.dataStart = dataStart;
- this.dataEnd = dataEnd;
+ this.offsetMin = offsetMin;
+ this.offsetMax = offsetMax;
+ this.dataMin = dataMin;
+ this.dataMax = dataMax;
+ }
+
+ public StringBuilder buildPubStoreInfo(StringBuilder sBuilder) {
+ sBuilder.append("{\"partitionId\":").append(partitionId)
+ .append(",\"offsetMin\":").append(offsetMin)
+ .append(",\"offsetMax\":").append(offsetMax)
+ .append(",\"dataMin\":").append(dataMin)
+ .append(",\"dataMax\":").append(dataMax)
+ .append("}");
+ return sBuilder;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 9a0506e..d8f85d4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -77,6 +77,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// get all registered methods
innRegisterWebMethod("admin_get_methods",
"adminQueryAllMethods");
+ // query topic's publish info
+ innRegisterWebMethod("admin_query_pubinfo",
+ "adminQueryPubInfo");
// Query all consumer groups booked on the Broker.
innRegisterWebMethod("admin_query_group",
"adminQueryBookedGroup");
@@ -467,7 +470,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
* @throws Exception
*/
public void adminQueryCurrentGroupOffSet(HttpServletRequest req,
- StringBuilder sBuilder) throws Exception {
+ StringBuilder sBuilder) {
ProcessResult result = WebParameterUtils.getStringParamValue(req,
WebFieldDef.TOPICNAME, true, null);
if (!result.success) {
@@ -576,6 +579,50 @@ public class BrokerAdminServlet extends AbstractWebHandler {
}
/***
+ * Query topic's publish info on the Broker.
+ *
+ * @param req
+ * @param sBuilder process result
+ */
+ public void adminQueryPubInfo(HttpServletRequest req,
+ StringBuilder sBuilder) {
+ // get the topic set to be queried
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return;
+ }
+ // get target consume group name
+ Set<String> topicSet = (Set<String>) result.retData1;
+ // get topic's publish info
+ Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
+ broker.getStoreManager().getTopicPublishInfos(topicSet);
+ // builder result
+ int totalCnt = 0;
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+ for (Map.Entry<String, Map<Integer, TopicPubStoreInfo>> entry
+ : topicStorePubInfoMap.entrySet()) {
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"topicName\":\"").append(entry.getKey())
+ .append("\",\"offsetInfo\":[");
+ Map<Integer, TopicPubStoreInfo> storeInfoMap = entry.getValue();
+ int itemCnt = 0;
+ for (Map.Entry<Integer, TopicPubStoreInfo> entry1 : storeInfoMap.entrySet()) {
+ if (itemCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ TopicPubStoreInfo pubStoreInfo = entry1.getValue();
+ pubStoreInfo.buildPubStoreInfo(sBuilder);
+ }
+ sBuilder.append("],\"itemCount\":").append(itemCnt).append("}");
+ }
+ sBuilder.append("],\"dataCount\":").append(totalCnt).append("}");
+ }
+
+ /***
* Query all consumer groups booked on the Broker.
*
* @param req
@@ -583,7 +630,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminQueryBookedGroup(HttpServletRequest req,
StringBuilder sBuilder) {
- // get group list
+ // get divide info
ProcessResult result = WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.WITHDIVIDE, false, false);
if (!result.success) {