You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:43 UTC
[rocketmq] 02/26: [ISSUE #5406] Support getConsumerIdList
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4b9816e4a911f656470c70b4528e3a0517aa2090
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Mon Oct 31 16:44:50 2022 +0800
[ISSUE #5406] Support getConsumerIdList
---
.../remoting/activity/ConsumerManagerActivity.java | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
index fb248a894..734b1dad1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -20,11 +20,17 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
@@ -64,8 +70,15 @@ public class ConsumerManagerActivity extends AbstractRemotingActivity {
protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
- // TODO after connection-related module
- return null;
+ RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+ GetConsumerListByGroupRequestHeader header = (GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+ ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+ List<String> clientIds = consumerGroupInfo.getAllClientId();
+ GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
+ body.setConsumerIdList(clientIds);
+ response.setBody(body.encode());
+ response.setCode(ResponseCode.SUCCESS);
+ return response;
}
protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,