You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 09:37:14 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the static topic commands
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new f05cfb9 Polish the static topic commands
f05cfb9 is described below
commit f05cfb9295bc8439ee65f5d364e5f47ab7330e65
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 17:36:58 2021 +0800
Polish the static topic commands
---
.../topic/RemappingStaticTopicSubCommand.java | 1 +
.../command/topic/UpdateStaticTopicSubCommand.java | 70 ++++++++++++----------
2 files changed, 39 insertions(+), 32 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 656396c..3b1b27a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -244,6 +244,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
+
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index cdfdcfe..6184290 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -112,13 +112,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
- //If some succeed, and others fail, it will cause inconsistent data
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry : wrapper.getBrokerConfigMap().entrySet()) {
- String broker = entry.getKey();
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
- }
+ doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -127,6 +121,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
}
+ public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+ //If some succeed, and others fail, it will cause inconsistent data
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+ String broker = entry.getKey();
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ }
+ }
+
@Override
public void execute(final CommandLine commandLine, final Options options,
@@ -147,15 +151,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
- Set<String> brokers = new HashSet<>();
+ Set<String> targetBrokers = new HashSet<>();
try {
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
- || !commandLine.hasOption('t')
|| !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
+ String topic = commandLine.getOptionValue('t').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
@@ -163,26 +167,33 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
-
- String clusters = commandLine.getOptionValue('c').trim();
- for (String cluster : clusters.split(",")) {
- cluster = cluster.trim();
- if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
- brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ {
+ if (commandLine.hasOption("b")) {
+ String brokerStrs = commandLine.getOptionValue("b").trim();
+ for (String broker: brokerStrs.split(",")) {
+ targetBrokers.add(broker.trim());
+ }
+ } else if (commandLine.hasOption("c")) {
+ String clusters = commandLine.getOptionValue('c').trim();
+ for (String cluster : clusters.split(",")) {
+ cluster = cluster.trim();
+ if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+ targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ }
+ }
}
- }
- if (brokers.isEmpty()) {
- throw new RuntimeException("Find none brokers for " + clusters);
- }
- for (String broker : brokers) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- if (addr == null) {
- throw new RuntimeException("Can't find addr for broker " + broker);
+ if (targetBrokers.isEmpty()) {
+ throw new RuntimeException("Find none brokers, do nothing");
+ }
+ for (String broker : targetBrokers) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ if (addr == null) {
+ throw new RuntimeException("Can't find addr for broker " + broker);
+ }
}
}
//get the existed topic config and mapping
- String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
@@ -223,7 +234,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
//the check is ok, now do the mapping allocation
- Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
+ Map<String, Integer> brokerNumMap = targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
final Map<Integer, String> oldIdToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = value.getBname();
@@ -277,13 +288,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
- //If some succeed, and others fail, it will cause inconsistent data
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
- String broker = entry.getKey();
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
- }
+ doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt);
+
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {