You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/11 06:26:47 UTC
[inlong] branch master updated: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2276c45ff [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)
2276c45ff is described below
commit 2276c45ff2a74ad43963265d33fc62f072a32b23
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Oct 11 14:26:42 2022 +0800
[INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)
---
.../tubemq/server/broker/BrokerServiceServer.java | 4 +--
.../server/broker/metadata/BrokerDefMetadata.java | 3 +-
.../broker/msgstore/MessageStoreManager.java | 2 +-
.../server/broker/nodeinfo/ConsumerNodeInfo.java | 18 ++++++++---
.../server/broker/web/BrokerAdminServlet.java | 37 +++++++++++++++++-----
5 files changed, 47 insertions(+), 17 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 8d9f52da9..616f0d653 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -940,8 +940,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
int reqQryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
consumerNodeInfo =
- new ConsumerNodeInfo(storeManager, reqQryPriorityId, clientId,
- filterCondSet, reqSessionKey, reqSessionTime,
+ new ConsumerNodeInfo(storeManager, reqQryPriorityId, groupName,
+ clientId, filterCondSet, reqSessionKey, reqSessionTime,
true, partStr, msgRcvFrom);
if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
BrokerSrvStatsHolder.incConsumeOnlineCnt();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
index 6e6806fe1..cf06167ba 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
@@ -65,7 +65,8 @@ public class BrokerDefMetadata {
if (TStringUtils.isBlank(brokerDefMetaConfInfo)) {
return;
}
- String[] brokerDefaultConfInfoArr = brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP);
+ String[] brokerDefaultConfInfoArr =
+ brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP, -1);
this.numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
this.acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
this.acceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 5d9299058..1856b8252 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -366,7 +366,7 @@ public class MessageStoreManager implements StoreService {
try {
final long maxOffset = msgStore.getIndexMaxOffset();
ConsumerNodeInfo consumerNodeInfo =
- new ConsumerNodeInfo(tubeBroker.getStoreManager(),
+ new ConsumerNodeInfo(tubeBroker.getStoreManager(), "visit",
"visit", filterCondSet, "", System.currentTimeMillis(), "", "");
int maxIndexReadSize = (msgCount + 1)
* DataStoreUtils.STORE_INDEX_HEAD_LEN * msgStore.getPartitionNum();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 2071ac474..b49e2c140 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -36,6 +36,7 @@ public class ConsumerNodeInfo {
private final MessageStoreManager storeManager;
// consumer id
private String consumerId;
+ private final String groupName;
private final String sessionKey;
private final long sessionTime;
// is filter consumer or not
@@ -66,6 +67,7 @@ public class ConsumerNodeInfo {
* Initial consumer node information
*
* @param storeManager the store manager
+ * @param groupName the group name
* @param consumerId the consumer id
* @param filterCodes the filter condition items
* @param sessionKey the session key
@@ -73,12 +75,13 @@ public class ConsumerNodeInfo {
* @param partStr the partition information
* @param msgRcvFrom the address message received
*/
- public ConsumerNodeInfo(MessageStoreManager storeManager,
+ public ConsumerNodeInfo(MessageStoreManager storeManager, String groupName,
String consumerId, Set<String> filterCodes,
String sessionKey, long sessionTime, String partStr,
String msgRcvFrom) {
- this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, consumerId,
- filterCodes, sessionKey, sessionTime, false, partStr, msgRcvFrom);
+ this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, groupName,
+ consumerId, filterCodes, sessionKey, sessionTime, false,
+ partStr, msgRcvFrom);
}
/**
@@ -94,8 +97,8 @@ public class ConsumerNodeInfo {
* @param partStr the partition information
* @param msgRcvFrom the address message received
*/
- public ConsumerNodeInfo(MessageStoreManager storeManager,
- int qryPriorityId, String consumerId,
+ public ConsumerNodeInfo(MessageStoreManager storeManager, int qryPriorityId,
+ String groupName, String consumerId,
Set<String> filterCodes, String sessionKey,
long sessionTime, boolean isSupportLimit,
String partStr, String msgRcvFrom) {
@@ -106,6 +109,7 @@ public class ConsumerNodeInfo {
this.filterCondCode.add(filterItem.hashCode());
}
}
+ this.groupName = groupName;
this.sessionKey = sessionKey;
this.sessionTime = sessionTime;
this.qryPriorityId.set(qryPriorityId);
@@ -154,6 +158,10 @@ public class ConsumerNodeInfo {
}
}
+ public String getGroupName() {
+ return groupName;
+ }
+
public String getPartStr() {
return partStr;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index 1f4b92cd6..11f368064 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -77,6 +77,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
innRegisterWebMethod("admin_snapshot_message",
"adminQuerySnapshotMessageSet", false);
// query broker's all consumer info
+ innRegisterWebMethod("admin_get_group_detail_info",
+ "adminQueryBrokerAllConsumerInfo", false);
+ // Replaced by admin_get_group_detail_info
innRegisterWebMethod("admin_query_broker_all_consumer_info",
"adminQueryBrokerAllConsumerInfo", false);
// get memory store status info
@@ -86,6 +89,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
innRegisterWebMethod("admin_query_broker_all_store_info",
"adminQueryBrokerAllMessageStoreInfo", false);
// query consumer register info
+ innRegisterWebMethod("admin_get_partition_reginfo",
+ "adminQueryConsumerRegisterInfo", false);
+ // Replaced by admin_get_partition_reginfo
innRegisterWebMethod("admin_query_consumer_regmap",
"adminQueryConsumerRegisterInfo", false);
// manual set offset
@@ -98,6 +104,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
innRegisterWebMethod("admin_query_pubinfo",
"adminQueryPubInfo", false);
// Query all consumer groups booked on the Broker.
+ innRegisterWebMethod("admin_get_booked_groupname",
+ "adminQueryBookedGroup", false);
+ // Replaced by admin_get_booked_groupname
innRegisterWebMethod("admin_query_group",
"adminQueryBookedGroup", false);
// query consumer group's offset
@@ -550,27 +559,39 @@ public class BrokerAdminServlet extends AbstractWebHandler {
* Query the consumed partition information of online consumer.
*
* @param req request
- * @param sBuffer process result
+ * @param strBuff process result
*/
public void adminQueryConsumerRegisterInfo(HttpServletRequest req,
- StringBuilder sBuffer) {
+ StringBuilder strBuff) {
+ ProcessResult result = new ProcessResult();
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, strBuff, result)) {
+ WebParameterUtils.buildFailResult(strBuff, result.getErrMsg());
+ return;
+ }
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // get online partition-client map
+ int totalCnt = 0;
Map<String, ConsumerNodeInfo> map =
broker.getBrokerServiceServer().getConsumerRegisterMap();
- int totalCnt = 0;
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
+ strBuff.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
+ if (!groupNameSet.isEmpty()
+ && !groupNameSet.contains(entry.getValue().getGroupName())) {
+ continue;
+ }
if (totalCnt++ > 0) {
- sBuffer.append(",");
+ strBuff.append(",");
}
- sBuffer.append("{\"Partition\":\"").append(entry.getKey())
+ strBuff.append("{\"Partition\":\"").append(entry.getKey())
.append("\",\"Consumer\":\"")
.append(entry.getValue().getConsumerId())
.append("\",\"index\":").append(totalCnt).append("}");
}
- sBuffer.append("],\"totalCnt\":").append(totalCnt).append("}");
+ strBuff.append("],\"totalCnt\":").append(totalCnt).append("}");
}
/**
@@ -854,7 +875,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
filterCodes.add(groupName);
// build consumer node information
ConsumerNodeInfo consumerNodeInfo = new ConsumerNodeInfo(broker.getStoreManager(),
- "offsetConsumer", filterCodes, "", System.currentTimeMillis(), "", "");
+ groupName, "offsetConsumer", filterCodes, "", System.currentTimeMillis(), "", "");
// query records from storage
int qryRetryCount = 0;
long itemInitOffset = requestOffset;