You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ti...@apache.org on 2021/09/17 17:04:50 UTC

[rocketmq] branch develop updated: [ISSUE #2964] Add a query consumer config command in mqadmin.

This is an automated email from the ASF dual-hosted git repository.

tigerlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 46bd4be  [ISSUE #2964] Add a query consumer config command in mqadmin.
     new a9837d9  Merge pull request #2965 from zhangjidi2016/add_getConsumerConfig_command
46bd4be is described below

commit 46bd4be66b1566e3386dc3f1d55b6cfb6ac207ce
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Wed Jun 2 09:52:17 2021 +0800

    [ISSUE #2964] Add a query consumer config command in mqadmin.
---
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   3 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |   6 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   2 +-
 .../rocketmq/tools/command/MQAdminStartup.java     |   2 +
 .../consumer/GetConsumerConfigSubCommand.java      | 146 +++++++++++++++++++++
 .../consumer/GetConsumerConfigSubCommandTest.java  |  83 ++++++++++++
 6 files changed, 238 insertions(+), 4 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6592639..4b83bb4 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -203,7 +203,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
+    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group)
+        throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
         return defaultMQAdminExtImpl.examineSubscriptionGroupConfig(addr, group);
     }
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 8930bbe..543f265 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -217,8 +217,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
-        return null;
+    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group)
+        throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        SubscriptionGroupWrapper wrapper = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(addr, timeoutMillis);
+        return wrapper.getSubscriptionGroupTable().get(group);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index d5462cb..188f14d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -90,7 +90,7 @@ public interface MQAdminExt extends MQAdmin {
         final SubscriptionGroupConfig config) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException;
 
-    SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
+    SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
 
     TopicConfig examineTopicConfig(final String addr, final String topic);
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index f947744..a055f7e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand
 import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand;
 import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand;
 import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand;
+import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand;
 import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
 import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
 import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
@@ -202,6 +203,7 @@ public class MQAdminStartup {
         initCommand(new GetNamesrvConfigCommand());
         initCommand(new UpdateNamesrvConfigCommand());
         initCommand(new GetBrokerConfigCommand());
+        initCommand(new GetConsumerConfigSubCommand());
 
         initCommand(new QueryConsumeQueueCommand());
         initCommand(new SendMessageCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
new file mode 100644
index 0000000..be2b946
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetConsumerConfigSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "getConsumerConfig";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Get consumer config by subscription group name!";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("g", "groupName", true, "subscription group name");
+        opt.setRequired(true);
+        options.addOption(opt);
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
+        adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        String groupName = commandLine.getOptionValue('g').trim();
+        try {
+            adminExt.start();
+            List<ConsumerConfigInfo> consumerConfigInfoList = new ArrayList<>();
+            ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo();
+            Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+            for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) {
+                String clusterName = this.getClusterName(brokerName, clusterAddrTable);
+                String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
+                SubscriptionGroupConfig subscriptionGroupConfig = adminExt.examineSubscriptionGroupConfig(brokerAddress, groupName);
+                if (subscriptionGroupConfig == null) {
+                    continue;
+                }
+                consumerConfigInfoList.add(new ConsumerConfigInfo(clusterName, brokerName, subscriptionGroupConfig));
+            }
+            if (CollectionUtils.isEmpty(consumerConfigInfoList)) {
+                return;
+            }
+            for (ConsumerConfigInfo info : consumerConfigInfoList) {
+                System.out.printf("=============================%s:%s=============================\n",
+                    info.getClusterName(), info.getBrokerName());
+                SubscriptionGroupConfig config = info.getSubscriptionGroupConfig();
+                Field[] fields = config.getClass().getDeclaredFields();
+                for (Field field : fields) {
+                    field.setAccessible(true);
+                    if (field.get(config) != null) {
+                        System.out.printf("%s%-40s=  %s\n", "", field.getName(), field.get(config).toString());
+                    } else {
+                        System.out.printf("%s%-40s=  %s\n", "", field.getName(), "");
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            adminExt.shutdown();
+        }
+    }
+
+    private String getClusterName(String brokeName, Map<String, Set<String>> clusterAddrTable) {
+        for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {
+            Set<String> brokerNameSet = entry.getValue();
+            if (brokerNameSet.contains(brokeName)) {
+                return entry.getKey();
+            }
+        }
+        return null;
+    }
+}
+
+class ConsumerConfigInfo {
+    private String clusterName;
+
+    private String brokerName;
+
+    private SubscriptionGroupConfig subscriptionGroupConfig;
+
+    public ConsumerConfigInfo(String clusterName, String brokerName, SubscriptionGroupConfig subscriptionGroupConfig) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.subscriptionGroupConfig = subscriptionGroupConfig;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerNameList) {
+        this.brokerName = brokerName;
+    }
+
+    public SubscriptionGroupConfig getSubscriptionGroupConfig() {
+        return subscriptionGroupConfig;
+    }
+
+    public void setSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+        this.subscriptionGroupConfig = subscriptionGroupConfig;
+    }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
new file mode 100644
index 0000000..1ec68ff
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.consumer;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class GetConsumerConfigSubCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Ignore
+    @Test
+    public void testExecute() throws SubCommandException {
+        System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
+        GetConsumerConfigSubCommand cmd = new GetConsumerConfigSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-g group_test"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+}
\ No newline at end of file