You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:43 UTC

[rocketmq] 02/26: [ISSUE #5406] Support getConsumerIdList

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4b9816e4a911f656470c70b4528e3a0517aa2090
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Mon Oct 31 16:44:50 2022 +0800

    [ISSUE #5406] Support getConsumerIdList
---
 .../remoting/activity/ConsumerManagerActivity.java      | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
index fb248a894..734b1dad1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -20,11 +20,17 @@ package org.apache.rocketmq.proxy.remoting.activity;
 import io.netty.channel.ChannelHandlerContext;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
 import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
@@ -64,8 +70,15 @@ public class ConsumerManagerActivity extends AbstractRemotingActivity {
 
     protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request,
         ProxyContext context) throws Exception {
-        // TODO after connection-related module
-        return null;
+        RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+        GetConsumerListByGroupRequestHeader header = (GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+        ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+        List<String> clientIds = consumerGroupInfo.getAllClientId();
+        GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
+        body.setConsumerIdList(clientIds);
+        response.setBody(body.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        return response;
     }
 
     protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,