You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ti...@apache.org on 2021/09/17 17:04:50 UTC
[rocketmq] branch develop updated: [ISSUE #2964] Add a query
consumer config command in mqadmin.
This is an automated email from the ASF dual-hosted git repository.
tigerlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 46bd4be [ISSUE #2964] Add a query consumer config command in mqadmin.
new a9837d9 Merge pull request #2965 from zhangjidi2016/add_getConsumerConfig_command
46bd4be is described below
commit 46bd4be66b1566e3386dc3f1d55b6cfb6ac207ce
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Wed Jun 2 09:52:17 2021 +0800
[ISSUE #2964] Add a query consumer config command in mqadmin.
---
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 3 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 6 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 2 +-
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../consumer/GetConsumerConfigSubCommand.java | 146 +++++++++++++++++++++
.../consumer/GetConsumerConfigSubCommandTest.java | 83 ++++++++++++
6 files changed, 238 insertions(+), 4 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6592639..4b83bb4 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -203,7 +203,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
+ public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group)
+ throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return defaultMQAdminExtImpl.examineSubscriptionGroupConfig(addr, group);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 8930bbe..543f265 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -217,8 +217,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
- return null;
+ public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group)
+ throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ SubscriptionGroupWrapper wrapper = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(addr, timeoutMillis);
+ return wrapper.getSubscriptionGroupTable().get(group);
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index d5462cb..188f14d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -90,7 +90,7 @@ public interface MQAdminExt extends MQAdmin {
final SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
- SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
+ SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
TopicConfig examineTopicConfig(final String addr, final String topic);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index f947744..a055f7e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand
import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand;
import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand;
import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand;
+import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
@@ -202,6 +203,7 @@ public class MQAdminStartup {
initCommand(new GetNamesrvConfigCommand());
initCommand(new UpdateNamesrvConfigCommand());
initCommand(new GetBrokerConfigCommand());
+ initCommand(new GetConsumerConfigSubCommand());
initCommand(new QueryConsumeQueueCommand());
initCommand(new SendMessageCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
new file mode 100644
index 0000000..be2b946
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetConsumerConfigSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "getConsumerConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get consumer config by subscription group name!";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("g", "groupName", true, "subscription group name");
+ opt.setRequired(true);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
+ adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ String groupName = commandLine.getOptionValue('g').trim();
+ try {
+ adminExt.start();
+ List<ConsumerConfigInfo> consumerConfigInfoList = new ArrayList<>();
+ ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo();
+ Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) {
+ String clusterName = this.getClusterName(brokerName, clusterAddrTable);
+ String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
+ SubscriptionGroupConfig subscriptionGroupConfig = adminExt.examineSubscriptionGroupConfig(brokerAddress, groupName);
+ if (subscriptionGroupConfig == null) {
+ continue;
+ }
+ consumerConfigInfoList.add(new ConsumerConfigInfo(clusterName, brokerName, subscriptionGroupConfig));
+ }
+ if (CollectionUtils.isEmpty(consumerConfigInfoList)) {
+ return;
+ }
+ for (ConsumerConfigInfo info : consumerConfigInfoList) {
+ System.out.printf("=============================%s:%s=============================\n",
+ info.getClusterName(), info.getBrokerName());
+ SubscriptionGroupConfig config = info.getSubscriptionGroupConfig();
+ Field[] fields = config.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ field.setAccessible(true);
+ if (field.get(config) != null) {
+ System.out.printf("%s%-40s= %s\n", "", field.getName(), field.get(config).toString());
+ } else {
+ System.out.printf("%s%-40s= %s\n", "", field.getName(), "");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ adminExt.shutdown();
+ }
+ }
+
+ private String getClusterName(String brokeName, Map<String, Set<String>> clusterAddrTable) {
+ for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {
+ Set<String> brokerNameSet = entry.getValue();
+ if (brokerNameSet.contains(brokeName)) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+}
+
+class ConsumerConfigInfo {
+ private String clusterName;
+
+ private String brokerName;
+
+ private SubscriptionGroupConfig subscriptionGroupConfig;
+
+ public ConsumerConfigInfo(String clusterName, String brokerName, SubscriptionGroupConfig subscriptionGroupConfig) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.subscriptionGroupConfig = subscriptionGroupConfig;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerNameList) {
+ this.brokerName = brokerName;
+ }
+
+ public SubscriptionGroupConfig getSubscriptionGroupConfig() {
+ return subscriptionGroupConfig;
+ }
+
+ public void setSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+ this.subscriptionGroupConfig = subscriptionGroupConfig;
+ }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
new file mode 100644
index 0000000..1ec68ff
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.consumer;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class GetConsumerConfigSubCommandTest {
+ private static DefaultMQAdminExt defaultMQAdminExt;
+ private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+ private static MQClientAPIImpl mQClientAPIImpl;
+
+ @BeforeClass
+ public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+ mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+ field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ }
+
+ @AfterClass
+ public static void terminate() {
+ defaultMQAdminExt.shutdown();
+ }
+
+ @Ignore
+ @Test
+ public void testExecute() throws SubCommandException {
+ System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
+ GetConsumerConfigSubCommand cmd = new GetConsumerConfigSubCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {"-g group_test"};
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+ cmd.execute(commandLine, options, null);
+ }
+}
\ No newline at end of file