You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/25 12:26:24 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-484]Add query API for topic publication information

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d1093a  [TUBEMQ-484]Add query API for topic publication information
8d1093a is described below

commit 8d1093ad5ed231bd291c63a92a982294d2556f63
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Dec 25 18:48:58 2020 +0800

    [TUBEMQ-484]Add query API for topic publication information
---
 .../server/broker/utils/GroupOffsetInfo.java       |  8 ++--
 .../server/broker/utils/TopicPubStoreInfo.java     | 28 ++++++++----
 .../server/broker/web/BrokerAdminServlet.java      | 51 +++++++++++++++++++++-
 3 files changed, 72 insertions(+), 15 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
index 9a4abe3..a0c7215 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
@@ -39,10 +39,10 @@ public class GroupOffsetInfo {
 
     public void setPartPubStoreInfo(TopicPubStoreInfo pubStoreInfo) {
         if (pubStoreInfo != null) {
-            this.offsetMin = pubStoreInfo.indexStart;
-            this.offsetMax = pubStoreInfo.indexEnd;
-            this.dataMin = pubStoreInfo.dataStart;
-            this.dataMax = pubStoreInfo.dataEnd;
+            this.offsetMin = pubStoreInfo.offsetMin;
+            this.offsetMax = pubStoreInfo.offsetMax;
+            this.dataMin = pubStoreInfo.dataMin;
+            this.dataMax = pubStoreInfo.dataMax;
         }
     }
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
index b2257dd..73b9d69 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
@@ -26,20 +26,30 @@ public class TopicPubStoreInfo {
     public String topicName = null;
     public int storeId = TBaseConstants.META_VALUE_UNDEFINED;
     public int partitionId = TBaseConstants.META_VALUE_UNDEFINED;
-    public long indexStart = 0L;
-    public long indexEnd = 0L;
-    public long dataStart = 0L;
-    public long dataEnd = 0L;
+    public long offsetMin = 0L;
+    public long offsetMax = 0L;
+    public long dataMin = 0L;
+    public long dataMax = 0L;
 
     public TopicPubStoreInfo(String topicName, int storeId, int partitionId,
-                             long indexStart, long indexEnd, long dataStart, long dataEnd) {
+                             long offsetMin, long offsetMax, long dataMin, long dataMax) {
         this.topicName = topicName;
         this.storeId = storeId;
         this.partitionId = partitionId;
-        this.indexStart = indexStart;
-        this.indexEnd = indexEnd;
-        this.dataStart = dataStart;
-        this.dataEnd = dataEnd;
+        this.offsetMin = offsetMin;
+        this.offsetMax = offsetMax;
+        this.dataMin = dataMin;
+        this.dataMax = dataMax;
+    }
+
+    public StringBuilder buildPubStoreInfo(StringBuilder sBuilder) {
+        sBuilder.append("{\"partitionId\":").append(partitionId)
+                .append(",\"offsetMin\":").append(offsetMin)
+                .append(",\"offsetMax\":").append(offsetMax)
+                .append(",\"dataMin\":").append(dataMin)
+                .append(",\"dataMax\":").append(dataMax)
+                .append("}");
+        return sBuilder;
     }
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 9a0506e..d8f85d4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -77,6 +77,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // get all registered methods
         innRegisterWebMethod("admin_get_methods",
                 "adminQueryAllMethods");
+        // query topic's publish info
+        innRegisterWebMethod("admin_query_pubinfo",
+                "adminQueryPubInfo");
         // Query all consumer groups booked on the Broker.
         innRegisterWebMethod("admin_query_group",
                 "adminQueryBookedGroup");
@@ -467,7 +470,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminQueryCurrentGroupOffSet(HttpServletRequest req,
-                                             StringBuilder sBuilder) throws Exception {
+                                             StringBuilder sBuilder) {
         ProcessResult result = WebParameterUtils.getStringParamValue(req,
                 WebFieldDef.TOPICNAME, true, null);
         if (!result.success) {
@@ -576,6 +579,50 @@ public class BrokerAdminServlet extends AbstractWebHandler {
     }
 
     /***
+     * Query topic's publish info on the Broker.
+     *
+     * @param req
+     * @param sBuilder process result
+     */
+    public void adminQueryPubInfo(HttpServletRequest req,
+                                  StringBuilder sBuilder) {
+        // get the topic set to be queried
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null);
+        if (!result.success) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // get target consume group name
+        Set<String> topicSet = (Set<String>) result.retData1;
+        // get topic's publish info
+        Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
+                broker.getStoreManager().getTopicPublishInfos(topicSet);
+        // builder result
+        int totalCnt = 0;
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+        for (Map.Entry<String, Map<Integer, TopicPubStoreInfo>> entry
+                : topicStorePubInfoMap.entrySet()) {
+            if (totalCnt++ > 0) {
+                sBuilder.append(",");
+            }
+            sBuilder.append("{\"topicName\":\"").append(entry.getKey())
+                    .append("\",\"offsetInfo\":[");
+            Map<Integer, TopicPubStoreInfo> storeInfoMap = entry.getValue();
+            int itemCnt = 0;
+            for (Map.Entry<Integer, TopicPubStoreInfo> entry1 : storeInfoMap.entrySet()) {
+                if (itemCnt++ > 0) {
+                    sBuilder.append(",");
+                }
+                TopicPubStoreInfo pubStoreInfo = entry1.getValue();
+                pubStoreInfo.buildPubStoreInfo(sBuilder);
+            }
+            sBuilder.append("],\"itemCount\":").append(itemCnt).append("}");
+        }
+        sBuilder.append("],\"dataCount\":").append(totalCnt).append("}");
+    }
+
+    /***
      * Query all consumer groups booked on the Broker.
      *
      * @param req
@@ -583,7 +630,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQueryBookedGroup(HttpServletRequest req,
                                       StringBuilder sBuilder) {
-        // get group list
+        // get divide info
         ProcessResult result = WebParameterUtils.getBooleanParamValue(req,
                 WebFieldDef.WITHDIVIDE, false, false);
         if (!result.success) {