You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/10/13 11:47:59 UTC
[rocketmq] branch develop updated: [ISSUE #3148]Support metadata
export (#3149)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cd45023 [ISSUE #3148]Support metadata export (#3149)
cd45023 is described below
commit cd45023d61c6184cf798a66e9d83e00cd8c8f4be
Author: panzhi <pa...@qq.com>
AuthorDate: Wed Oct 13 19:47:51 2021 +0800
[ISSUE #3148]Support metadata export (#3149)
---
distribution/bin/export.sh | 89 +++++++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 18 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 57 ++++-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 10 +-
.../apache/rocketmq/tools/command/CommandUtil.java | 1 -
.../rocketmq/tools/command/MQAdminStartup.java | 7 +
.../tools/command/export/ExportConfigsCommand.java | 128 ++++++++++
.../command/export/ExportMetadataCommand.java | 184 ++++++++++++++
.../tools/command/export/ExportMetricsCommand.java | 282 +++++++++++++++++++++
9 files changed, 771 insertions(+), 5 deletions(-)
diff --git a/distribution/bin/export.sh b/distribution/bin/export.sh
new file mode 100644
index 0000000..2b323e8
--- /dev/null
+++ b/distribution/bin/export.sh
@@ -0,0 +1,89 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ -z "$ROCKETMQ_HOME" ]; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ]; do
+ ls=$(ls -ld "$PRG")
+ link=$(expr "$ls" : '.*-> \(.*\)$')
+ if expr "$link" : '/.*' >/dev/null; then
+ PRG="$link"
+ else
+ PRG="$(dirname "$PRG")/$link"
+ fi
+ done
+
+ saveddir=$(pwd)
+
+ ROCKETMQ_HOME=$(dirname "$PRG")/..
+
+ # make it fully qualified
+ ROCKETMQ_HOME=$(cd "$ROCKETMQ_HOME" && pwd)
+
+ cd "$saveddir"
+fi
+
+export ROCKETMQ_HOME
+
+namesrvAddr=
+while [ -z "${namesrvAddr}" ]; do
+ read -p "Enter name server address list:" namesrvAddr
+done
+
+clusterName=
+while [ -z "${clusterName}" ]; do
+ read -p "Choose a cluster to export:" clusterName
+done
+
+read -p "Enter file path to export [default /tmp/rocketmq/export]:" filePath
+if [ -z "${filePath}" ]; then
+ filePath="/tmp/rocketmq/config"
+fi
+
+if [[ -e ${filePath} ]]; then
+ rm -rf ${filePath}
+fi
+
+sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetrics -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
+sh ${ROCKETMQ_HOME}/bin/mqadmin exportConfigs -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
+sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetadata -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
+
+cd ${filePath} || exit
+
+configs=$(cat ./configs.json)
+if [ -z "$configs" ]; then
+ configs="{}"
+fi
+metadata=$(cat ./metadata.json)
+if [ -z "$metadata" ]; then
+ metadata="{}"
+fi
+metrics=$(cat ./metrics.json)
+if [ -z "$metrics" ]; then
+ metrics="{}"
+fi
+
+echo "{
+ \"configs\": ${configs},
+ \"metadata\": ${metadata},
+ \"metrics\": ${metrics}
+ }" >rocketmq-metadata-export.json
+
+echo -e "[INFO] The RocketMQ metadata has been exported to the file:${filePath}/rocketmq-metadata-export.json"
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index a007225..c3e3a30 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -501,10 +501,24 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
+ public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
- return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
+ return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
+ long timeoutMillis) throws InterruptedException, RemotingException,
+ MQBrokerException, MQClientException {
+ return this.defaultMQAdminExtImpl.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis);
}
/* (non-Javadoc)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 66ecc54..80999c2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -99,6 +99,24 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
private long timeoutMillis = 20000;
private Random random = new Random();
+ private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
+
+ static {
+ SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
+ }
+
public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
this(defaultMQAdminExt, null, timeoutMillis);
}
@@ -964,13 +982,50 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
+ public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl()
+ .getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+
+ Iterator<Entry<String, SubscriptionGroupConfig>> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable()
+ .entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
+ if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) {
+ iterator.remove();
+ }
+ }
+
+ return subscriptionGroupWrapper;
+ }
+
+ @Override
+ public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
}
@Override
+ public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
+ long timeoutMillis) throws InterruptedException, RemotingException,
+ MQBrokerException, MQClientException {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis);
+ TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr,
+ timeoutMillis);
+ Iterator<Entry<String, TopicConfig>> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ String topic = iterator.next().getKey();
+ if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) {
+ iterator.remove();
+ }
+ }
+ return topicConfigSerializeWrapper;
+ }
+
+ @Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 94791d0..82c1cbd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -238,10 +238,18 @@ public interface MQAdminExt extends MQAdmin {
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
- TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
+ SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
+ TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException;
+
+ TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
+ long timeoutMillis) throws InterruptedException, RemotingException,
+ MQBrokerException, MQClientException;
+
void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
index 2e65f98..8984ca6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
@@ -146,5 +146,4 @@ public class CommandUtil {
}
throw new Exception(ERROR_MESSAGE);
}
-
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 4411a6c..e8572bd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -52,6 +52,8 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
+import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
+import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
@@ -74,6 +76,7 @@ import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
+import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicListSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
@@ -217,6 +220,10 @@ public class MQAdminStartup {
initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand());
initCommand(new GetAccessConfigSubCommand());
+
+ initCommand(new ExportMetadataCommand());
+ initCommand(new ExportConfigsCommand());
+ initCommand(new ExportMetricsCommand());
}
private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
new file mode 100644
index 0000000..c3ca9d3
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.export;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.alibaba.fastjson.JSON;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ExportConfigsCommand implements SubCommand {
+ @Override
+ public String commandName() {
+ return "exportConfigs";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "export configs";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("f", "filePath", true,
+ "export configs.json path | default /tmp/rocketmq/export");
+ opt.setRequired(false);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+ throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f')
+ .trim();
+
+ defaultMQAdminExt.start();
+ Map<String, Object> result = new HashMap<>();
+ // name servers
+ List<String> nameServerAddressList = defaultMQAdminExt.getNameServerAddressList();
+
+ //broker
+ int masterBrokerSize = 0;
+ int slaveBrokerSize = 0;
+ Map<String, Properties> brokerConfigs = new HashMap<>();
+ Map<String, List<String>> masterAndSlaveMap
+ = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+ for (String masterAddr : masterAndSlaveMap.keySet()) {
+ Properties masterProperties = defaultMQAdminExt.getBrokerConfig(masterAddr);
+ masterBrokerSize++;
+ slaveBrokerSize += masterAndSlaveMap.get(masterAddr).size();
+
+ brokerConfigs.put(masterProperties.getProperty("brokerName"), needBrokerProprties(masterProperties));
+ }
+
+ Map<String, Integer> clusterScaleMap = new HashMap<>();
+ clusterScaleMap.put("namesrvSize", nameServerAddressList.size());
+ clusterScaleMap.put("masterBrokerSize", masterBrokerSize);
+ clusterScaleMap.put("slaveBrokerSize", slaveBrokerSize);
+
+ result.put("brokerConfigs", brokerConfigs);
+ result.put("clusterScale", clusterScaleMap);
+
+ String path = filePath + "/configs.json";
+ MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path);
+ System.out.printf("export %s success", path);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ private Properties needBrokerProprties(Properties properties) {
+ Properties newProperties = new Properties();
+ newProperties.setProperty("brokerClusterName", properties.getProperty("brokerClusterName"));
+ newProperties.setProperty("brokerId", properties.getProperty("brokerId"));
+ newProperties.setProperty("brokerName", properties.getProperty("brokerName"));
+ newProperties.setProperty("brokerRole", properties.getProperty("brokerRole"));
+ newProperties.setProperty("fileReservedTime", properties.getProperty("fileReservedTime"));
+ newProperties.setProperty("filterServerNums", properties.getProperty("filterServerNums"));
+ newProperties.setProperty("flushDiskType", properties.getProperty("flushDiskType"));
+ newProperties.setProperty("maxMessageSize", properties.getProperty("maxMessageSize"));
+ newProperties.setProperty("messageDelayLevel", properties.getProperty("messageDelayLevel"));
+ newProperties.setProperty("msgTraceTopicName", properties.getProperty("msgTraceTopicName"));
+ newProperties.setProperty("slaveReadEnable", properties.getProperty("slaveReadEnable"));
+ newProperties.setProperty("traceOn", properties.getProperty("traceOn"));
+ newProperties.setProperty("traceTopicEnable", properties.getProperty("traceTopicEnable"));
+ newProperties.setProperty("useTLS", properties.getProperty("useTLS"));
+ newProperties.setProperty("autoCreateTopicEnable", properties.getProperty("autoCreateTopicEnable"));
+ newProperties.setProperty("autoCreateSubscriptionGroup", properties.getProperty("autoCreateSubscriptionGroup"));
+ return newProperties;
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java
new file mode 100644
index 0000000..1909436
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.export;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.alibaba.fastjson.JSON;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ExportMetadataCommand implements SubCommand {
+
+ private static final String DEFAULT_FILE_PATH = "/tmp/rocketmq/export";
+
+ @Override
+ public String commandName() {
+ return "exportMetadata";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "export metadata";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerAddr", true, "choose a broker to export");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("f", "filePath", true, "export metadata.json path | default /tmp/rocketmq/export");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", false, "only export topic metadata");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "subscriptionGroup", false, "only export subscriptionGroup metadata");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("s", "specialTopic", false, "need retryTopic and dlqTopic");
+ opt.setRequired(false);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+ throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+
+ String filePath = !commandLine.hasOption('f') ? DEFAULT_FILE_PATH : commandLine.getOptionValue('f')
+ .trim();
+
+ boolean specialTopic = commandLine.hasOption('s');
+
+ if (commandLine.hasOption('b')) {
+ final String brokerAddr = commandLine.getOptionValue('b').trim();
+
+ if (commandLine.hasOption('t')) {
+ filePath = filePath + "/topic.json";
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
+ brokerAddr, specialTopic, 10000L);
+ MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath);
+ System.out.printf("export %s success", filePath);
+ } else if (commandLine.hasOption('g')) {
+ filePath = filePath + "/subscriptionGroup.json";
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(
+ brokerAddr, 10000L);
+ MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath);
+ System.out.printf("export %s success", filePath);
+ }
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+
+ Map<String, TopicConfig> topicConfigMap = new HashMap<>();
+ Map<String, SubscriptionGroupConfig> subGroupConfigMap = new HashMap<>();
+
+ for (String addr : masterSet) {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
+ addr, specialTopic, 10000L);
+
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(
+ addr, 10000);
+
+ if (commandLine.hasOption('t')) {
+ filePath = filePath + "/topic.json";
+ MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath);
+ System.out.printf("export %s success", filePath);
+ return;
+ } else if (commandLine.hasOption('g')) {
+ filePath = filePath + "/subscriptionGroup.json";
+ MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath);
+ System.out.printf("export %s success", filePath);
+ return;
+ } else {
+ for (Map.Entry<String, TopicConfig> entry : topicConfigSerializeWrapper.getTopicConfigTable().entrySet()) {
+ TopicConfig topicConfig = topicConfigMap.get(entry.getKey());
+ if (null != topicConfig) {
+ entry.getValue().setWriteQueueNums(
+ topicConfig.getWriteQueueNums() + entry.getValue().getWriteQueueNums());
+ entry.getValue().setReadQueueNums(
+ topicConfig.getReadQueueNums() + entry.getValue().getReadQueueNums());
+ }
+ topicConfigMap.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, SubscriptionGroupConfig> entry : subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet()) {
+
+ SubscriptionGroupConfig subscriptionGroupConfig = subGroupConfigMap.get(entry.getKey());
+ if (null != subscriptionGroupConfig) {
+ entry.getValue().setRetryQueueNums(
+ subscriptionGroupConfig.getRetryQueueNums() + entry.getValue().getRetryQueueNums());
+ }
+ subGroupConfigMap.put(entry.getKey(), entry.getValue());
+ }
+
+ Map<String, Object> result = new HashMap<>();
+ result.put("topicConfigTable", topicConfigMap);
+ result.put("subscriptionGroupTable", subGroupConfigMap);
+ result.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+ result.put("exportTime", System.currentTimeMillis());
+
+ filePath = filePath + "/metadata.json";
+ MixAll.string2FileNotSafe(JSON.toJSONString(result, true), filePath);
+ System.out.printf("export %s success", filePath);
+ }
+
+ }
+ } else {
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
+
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java
new file mode 100644
index 0000000..2b7db0b
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.export;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import com.alibaba.fastjson.JSON;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
+
+public class ExportMetricsCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "exportMetrics";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "export metrics";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("f", "filePath", true,
+ "export metrics.json path | default /tmp/rocketmq/export");
+ opt.setRequired(false);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+ throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f')
+ .trim();
+
+ defaultMQAdminExt.start();
+
+ Map<String, Map<String, Map<String, Object>>> evaluateReportMap = new HashMap<>();
+ Map<String, Double> totalTpsMap = new HashMap<>();
+ Map<String, Long> totalOneDayNumMap = new HashMap<>();
+ initTotalMap(totalTpsMap, totalOneDayNumMap);
+
+ ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+ Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+ if (brokerData != null) {
+ String addr = brokerData.getBrokerAddrs().get(0L);
+
+ KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(addr);
+
+ Properties properties = defaultMQAdminExt.getBrokerConfig(addr);
+
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(addr,
+ 10000);
+
+ Map<String, Map<String, Object>> brokerInfo = new HashMap<>();
+
+ //broker environment,machine configuration
+ brokerInfo.put("runtimeEnv", getRuntimeEnv(kvTable, properties));
+
+ brokerInfo.put("runtimeQuota",
+ getRuntimeQuota(kvTable, defaultMQAdminExt, addr, totalTpsMap,
+ totalOneDayNumMap, subscriptionGroupWrapper));
+
+ // runtime version
+ brokerInfo.put("runtimeVersion",
+ getRuntimeVersion(defaultMQAdminExt, subscriptionGroupWrapper));
+
+ evaluateReportMap.put(brokerName, brokerInfo);
+ }
+
+ }
+
+ String path = filePath + "/metrics.json";
+
+ Map<String, Object> totalData = new HashMap<>();
+ totalData.put("totalTps", totalTpsMap);
+ totalData.put("totalOneDayNum", totalOneDayNumMap);
+
+ Map<String, Object> result = new HashMap<>();
+ result.put("evaluateReport", evaluateReportMap);
+ result.put("totalData", totalData);
+
+ MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path);
+ System.out.printf("export %s success", path);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+
+ }
+
+ private Map<String, Object> getRuntimeVersion(DefaultMQAdminExt defaultMQAdminExt,
+ SubscriptionGroupWrapper subscriptionGroupWrapper) {
+ Map<String, Object> runtimeVersionMap = new HashMap();
+
+ Set<String> clientInfoSet = new HashSet<>();
+ for (Map.Entry<String, SubscriptionGroupConfig> entry : subscriptionGroupWrapper
+ .getSubscriptionGroupTable().entrySet()) {
+ try {
+ ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(
+ entry.getValue().getGroupName());
+ for (Connection conn : cc.getConnectionSet()) {
+ String clientInfo = conn.getLanguage() + "%" + MQVersion.getVersionDesc(conn.getVersion());
+ clientInfoSet.add(clientInfo);
+ }
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ runtimeVersionMap.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+ runtimeVersionMap.put("clientInfo", clientInfoSet);
+ return runtimeVersionMap;
+ }
+
+ private Map<String, Object> getRuntimeEnv(KVTable kvTable, Properties properties) {
+ Map<String, Object> runtimeEnvMap = new HashMap<>();
+ runtimeEnvMap.put("cpuNum", properties.getProperty("clientCallbackExecutorThreads"));
+ runtimeEnvMap.put("totalMemKBytes", kvTable.getTable().get("totalMemKBytes"));
+ return runtimeEnvMap;
+ }
+
+ private Map<String, Object> getRuntimeQuota(KVTable kvTable, DefaultMQAdminExt defaultMQAdminExt, String brokerAddr,
+ Map<String, Double> totalTpsMap, Map<String, Long> totalOneDayNumMap,
+ SubscriptionGroupWrapper subscriptionGroupWrapper)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
+ brokerAddr, false, 10000);
+
+ BrokerStatsData transStatsData = null;
+
+ try {
+ transStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr,
+ BrokerStatsManager.TOPIC_PUT_NUMS,
+ TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC);
+ } catch (MQClientException e) {
+ }
+
+ BrokerStatsData scheduleStatsData = null;
+ try {
+ scheduleStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr,
+ BrokerStatsManager.TOPIC_PUT_NUMS, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
+ } catch (MQClientException e) {
+ }
+
+ Map<String, Object> runtimeQuotaMap = new HashMap<>();
+ //disk use ratio
+ Map<String, Object> diskRatioMap = new HashMap<>();
+ diskRatioMap.put("commitLogDiskRatio", kvTable.getTable().get("commitLogDiskRatio"));
+ diskRatioMap.put("consumeQueueDiskRatio", kvTable.getTable().get("consumeQueueDiskRatio"));
+ runtimeQuotaMap.put("diskRatio", diskRatioMap);
+
+ //inTps and outTps
+ Map<String, Double> tpsMap = new HashMap<>();
+ double normalInTps = 0;
+ double normalOutTps = 0;
+ String putTps = kvTable.getTable().get("putTps");
+ String getTransferedTps = kvTable.getTable().get("getTransferedTps");
+ String[] inTpss = putTps.split(" ");
+ if (inTpss.length > 0) {
+ normalInTps = Double.parseDouble(inTpss[0]);
+ }
+
+ String[] outTpss = getTransferedTps.split(" ");
+ if (outTpss.length > 0) {
+ normalOutTps = Double.parseDouble(outTpss[0]);
+ }
+
+ double transInTps = null != transStatsData ? transStatsData.getStatsMinute().getTps() : 0.0;
+ double scheduleInTps = null != scheduleStatsData ? scheduleStatsData.getStatsMinute().getTps() : 0.0;
+
+ long transOneDayInNum = null != transStatsData ? StatsAllSubCommand.compute24HourSum(transStatsData) : 0;
+ long scheduleOneDayInNum = null != scheduleStatsData ? StatsAllSubCommand.compute24HourSum(scheduleStatsData) : 0;
+
+ //current minute tps
+ tpsMap.put("normalInTps", normalInTps);
+ tpsMap.put("normalOutTps", normalOutTps);
+ tpsMap.put("transInTps", transInTps);
+ tpsMap.put("scheduleInTps", scheduleInTps);
+ runtimeQuotaMap.put("tps", tpsMap);
+
+ //one day num
+ Map<String, Long> oneDayNumMap = new HashMap<>();
+ long normalOneDayInNum = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning")) -
+ Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning"));
+ long normalOneDayOutNum = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning")) -
+ Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning"));
+ oneDayNumMap.put("normalOneDayInNum", normalOneDayInNum);
+ oneDayNumMap.put("normalOneDayOutNum", normalOneDayOutNum);
+ oneDayNumMap.put("transOneDayInNum", transOneDayInNum);
+ oneDayNumMap.put("scheduleOneDayInNum", scheduleOneDayInNum);
+ runtimeQuotaMap.put("oneDayNum", oneDayNumMap);
+
+ //all broker current minute tps
+ totalTpsMap.put("totalNormalInTps", totalTpsMap.get("totalNormalInTps") + normalInTps);
+ totalTpsMap.put("totalNormalOutTps", totalTpsMap.get("totalNormalOutTps") + normalOutTps);
+ totalTpsMap.put("totalTransInTps", totalTpsMap.get("totalTransInTps") + transInTps);
+ totalTpsMap.put("totalScheduleInTps", totalTpsMap.get("totalScheduleInTps") + scheduleInTps);
+
+
+ //all broker one day num
+ totalOneDayNumMap.put("normalOneDayInNum", totalOneDayNumMap.get("normalOneDayInNum") + normalOneDayInNum);
+ totalOneDayNumMap.put("normalOneDayOutNum", totalOneDayNumMap.get("normalOneDayOutNum") + normalOneDayOutNum);
+ totalOneDayNumMap.put("transOneDayInNum", totalOneDayNumMap.get("transOneDayInNum") + transOneDayInNum);
+ totalOneDayNumMap.put("scheduleOneDayInNum", totalOneDayNumMap.get("scheduleOneDayInNum") + scheduleOneDayInNum);
+
+ // putMessageAverageSize 平均
+ runtimeQuotaMap.put("messageAverageSize", kvTable.getTable().get("putMessageAverageSize"));
+
+ //topicSize
+ runtimeQuotaMap.put("topicSize", topicConfigSerializeWrapper.getTopicConfigTable().size());
+ runtimeQuotaMap.put("groupSize", subscriptionGroupWrapper.getSubscriptionGroupTable().size());
+ return runtimeQuotaMap;
+ }
+
+ private void initTotalMap(Map<String, Double> totalTpsMap, Map<String, Long> totalOneDayNumMap) {
+ totalTpsMap.put("totalNormalInTps", 0.0);
+ totalTpsMap.put("totalNormalOutTps", 0.0);
+ totalTpsMap.put("totalTransInTps", 0.0);
+ totalTpsMap.put("totalScheduleInTps", 0.0);
+
+ totalOneDayNumMap.put("normalOneDayInNum", 0L);
+ totalOneDayNumMap.put("normalOneDayOutNum", 0L);
+ totalOneDayNumMap.put("transOneDayInNum", 0L);
+ totalOneDayNumMap.put("scheduleOneDayInNum", 0L);
+ }
+}