You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/11 06:26:47 UTC

[inlong] branch master updated: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2276c45ff [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)
2276c45ff is described below

commit 2276c45ff2a74ad43963265d33fc62f072a32b23
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Oct 11 14:26:42 2022 +0800

    [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)
---
 .../tubemq/server/broker/BrokerServiceServer.java  |  4 +--
 .../server/broker/metadata/BrokerDefMetadata.java  |  3 +-
 .../broker/msgstore/MessageStoreManager.java       |  2 +-
 .../server/broker/nodeinfo/ConsumerNodeInfo.java   | 18 ++++++++---
 .../server/broker/web/BrokerAdminServlet.java      | 37 +++++++++++++++++-----
 5 files changed, 47 insertions(+), 17 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 8d9f52da9..616f0d653 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -940,8 +940,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             int reqQryPriorityId = request.hasQryPriorityId()
                     ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
             consumerNodeInfo =
-                    new ConsumerNodeInfo(storeManager, reqQryPriorityId, clientId,
-                            filterCondSet, reqSessionKey, reqSessionTime,
+                    new ConsumerNodeInfo(storeManager, reqQryPriorityId, groupName,
+                            clientId, filterCondSet, reqSessionKey, reqSessionTime,
                             true, partStr, msgRcvFrom);
             if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
                 BrokerSrvStatsHolder.incConsumeOnlineCnt();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
index 6e6806fe1..cf06167ba 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
@@ -65,7 +65,8 @@ public class BrokerDefMetadata {
         if (TStringUtils.isBlank(brokerDefMetaConfInfo)) {
             return;
         }
-        String[] brokerDefaultConfInfoArr = brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP);
+        String[] brokerDefaultConfInfoArr =
+                brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP, -1);
         this.numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
         this.acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
         this.acceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 5d9299058..1856b8252 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -366,7 +366,7 @@ public class MessageStoreManager implements StoreService {
         try {
             final long maxOffset = msgStore.getIndexMaxOffset();
             ConsumerNodeInfo consumerNodeInfo =
-                    new ConsumerNodeInfo(tubeBroker.getStoreManager(),
+                    new ConsumerNodeInfo(tubeBroker.getStoreManager(), "visit",
                             "visit", filterCondSet, "", System.currentTimeMillis(), "", "");
             int maxIndexReadSize = (msgCount + 1)
                     * DataStoreUtils.STORE_INDEX_HEAD_LEN * msgStore.getPartitionNum();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 2071ac474..b49e2c140 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -36,6 +36,7 @@ public class ConsumerNodeInfo {
     private final MessageStoreManager storeManager;
     // consumer id
     private String consumerId;
+    private final String groupName;
     private final String sessionKey;
     private final long sessionTime;
     // is filter consumer or not
@@ -66,6 +67,7 @@ public class ConsumerNodeInfo {
      * Initial consumer node information
      *
      * @param storeManager    the store manager
+     * @param groupName       the group name
      * @param consumerId      the consumer id
      * @param filterCodes     the filter condition items
      * @param sessionKey      the session key
@@ -73,12 +75,13 @@ public class ConsumerNodeInfo {
      * @param partStr         the partition information
      * @param msgRcvFrom      the address message received
      */
-    public ConsumerNodeInfo(MessageStoreManager storeManager,
+    public ConsumerNodeInfo(MessageStoreManager storeManager, String groupName,
                             String consumerId, Set<String> filterCodes,
                             String sessionKey, long sessionTime, String partStr,
                             String msgRcvFrom) {
-        this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, consumerId,
-                filterCodes, sessionKey, sessionTime, false, partStr, msgRcvFrom);
+        this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, groupName,
+                consumerId, filterCodes, sessionKey, sessionTime, false,
+                partStr, msgRcvFrom);
     }
 
     /**
@@ -94,8 +97,8 @@ public class ConsumerNodeInfo {
      * @param partStr            the partition information
      * @param msgRcvFrom         the address message received
      */
-    public ConsumerNodeInfo(MessageStoreManager storeManager,
-                            int qryPriorityId, String consumerId,
+    public ConsumerNodeInfo(MessageStoreManager storeManager, int qryPriorityId,
+                            String groupName, String consumerId,
                             Set<String> filterCodes, String sessionKey,
                             long sessionTime, boolean isSupportLimit,
                             String partStr, String msgRcvFrom) {
@@ -106,6 +109,7 @@ public class ConsumerNodeInfo {
                 this.filterCondCode.add(filterItem.hashCode());
             }
         }
+        this.groupName = groupName;
         this.sessionKey = sessionKey;
         this.sessionTime = sessionTime;
         this.qryPriorityId.set(qryPriorityId);
@@ -154,6 +158,10 @@ public class ConsumerNodeInfo {
         }
     }
 
+    public String getGroupName() {
+        return groupName;
+    }
+
     public String getPartStr() {
         return partStr;
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index 1f4b92cd6..11f368064 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -77,6 +77,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         innRegisterWebMethod("admin_snapshot_message",
                 "adminQuerySnapshotMessageSet", false);
         // query broker's all consumer info
+        innRegisterWebMethod("admin_get_group_detail_info",
+                "adminQueryBrokerAllConsumerInfo", false);
+        // Replaced by admin_get_group_detail_info
         innRegisterWebMethod("admin_query_broker_all_consumer_info",
                 "adminQueryBrokerAllConsumerInfo", false);
         // get memory store status info
@@ -86,6 +89,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         innRegisterWebMethod("admin_query_broker_all_store_info",
                 "adminQueryBrokerAllMessageStoreInfo", false);
         // query consumer register info
+        innRegisterWebMethod("admin_get_partition_reginfo",
+                "adminQueryConsumerRegisterInfo", false);
+        // Replaced by admin_get_partition_reginfo
         innRegisterWebMethod("admin_query_consumer_regmap",
                 "adminQueryConsumerRegisterInfo", false);
         // manual set offset
@@ -98,6 +104,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         innRegisterWebMethod("admin_query_pubinfo",
                 "adminQueryPubInfo", false);
         // Query all consumer groups booked on the Broker.
+        innRegisterWebMethod("admin_get_booked_groupname",
+                "adminQueryBookedGroup", false);
+        // Replaced by admin_get_booked_groupname
         innRegisterWebMethod("admin_query_group",
                 "adminQueryBookedGroup", false);
         // query consumer group's offset
@@ -550,27 +559,39 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * Query the consumed partition information of online consumer.
      *
      * @param req      request
-     * @param sBuffer  process result
+     * @param strBuff  process result
      */
     public void adminQueryConsumerRegisterInfo(HttpServletRequest req,
-                                               StringBuilder sBuffer) {
+                                               StringBuilder strBuff) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null, strBuff, result)) {
+            WebParameterUtils.buildFailResult(strBuff, result.getErrMsg());
+            return;
+        }
+        Set<String> groupNameSet = (Set<String>) result.getRetData();
+        // get online partition-client map
+        int totalCnt = 0;
         Map<String, ConsumerNodeInfo> map =
                 broker.getBrokerServiceServer().getConsumerRegisterMap();
-        int totalCnt = 0;
-        sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+        strBuff.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
         for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
             }
+            if (!groupNameSet.isEmpty()
+                    && !groupNameSet.contains(entry.getValue().getGroupName())) {
+                continue;
+            }
             if (totalCnt++ > 0) {
-                sBuffer.append(",");
+                strBuff.append(",");
             }
-            sBuffer.append("{\"Partition\":\"").append(entry.getKey())
+            strBuff.append("{\"Partition\":\"").append(entry.getKey())
                     .append("\",\"Consumer\":\"")
                     .append(entry.getValue().getConsumerId())
                     .append("\",\"index\":").append(totalCnt).append("}");
         }
-        sBuffer.append("],\"totalCnt\":").append(totalCnt).append("}");
+        strBuff.append("],\"totalCnt\":").append(totalCnt).append("}");
     }
 
     /**
@@ -854,7 +875,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         filterCodes.add(groupName);
         // build consumer node information
         ConsumerNodeInfo consumerNodeInfo = new ConsumerNodeInfo(broker.getStoreManager(),
-                "offsetConsumer", filterCodes, "", System.currentTimeMillis(), "", "");
+                groupName, "offsetConsumer", filterCodes, "", System.currentTimeMillis(), "", "");
         // query records from storage
         int qryRetryCount = 0;
         long itemInitOffset = requestOffset;