You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 09:37:14 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the static topic commands

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new f05cfb9  Polish the static topic commands
f05cfb9 is described below

commit f05cfb9295bc8439ee65f5d364e5f47ab7330e65
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 17:36:58 2021 +0800

    Polish the static topic commands
---
 .../topic/RemappingStaticTopicSubCommand.java      |  1 +
 .../command/topic/UpdateStaticTopicSubCommand.java | 70 ++++++++++++----------
 2 files changed, 39 insertions(+), 32 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 656396c..3b1b27a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -244,6 +244,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
             {
                 TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
                 clientMetadata.freshTopicRoute(topic, routeData);
+
                 if (routeData != null
                         && !routeData.getQueueDatas().isEmpty()) {
                     for (QueueData queueData: routeData.getQueueDatas()) {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index cdfdcfe..6184290 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -112,13 +112,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-            //If some succeed, and others fail, it will cause inconsistent data
-            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : wrapper.getBrokerConfigMap().entrySet()) {
-                String broker = entry.getKey();
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                TopicConfigAndQueueMapping configMapping = entry.getValue();
-                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
-            }
+            doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -127,6 +121,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         }
     }
 
+    public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+        //If some succeed, and others fail, it will cause inconsistent data
+        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            String broker = entry.getKey();
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = entry.getValue();
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+        }
+    }
+
 
     @Override
     public void execute(final CommandLine commandLine, final Options options,
@@ -147,15 +151,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
-        Set<String> brokers = new HashSet<>();
+        Set<String> targetBrokers = new HashSet<>();
 
         try {
             if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
-                    || !commandLine.hasOption('t')
                     || !commandLine.hasOption("qn")) {
                 ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
                 return;
             }
+            String topic = commandLine.getOptionValue('t').trim();
 
             ClusterInfo clusterInfo  = defaultMQAdminExt.examineBrokerClusterInfo();
             if (clusterInfo == null
@@ -163,26 +167,33 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-
-            String clusters = commandLine.getOptionValue('c').trim();
-            for (String cluster : clusters.split(",")) {
-                cluster = cluster.trim();
-                if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
-                    brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+            {
+                if (commandLine.hasOption("b")) {
+                    String brokerStrs = commandLine.getOptionValue("b").trim();
+                    for (String broker: brokerStrs.split(",")) {
+                        targetBrokers.add(broker.trim());
+                    }
+                } else if (commandLine.hasOption("c")) {
+                    String clusters = commandLine.getOptionValue('c').trim();
+                    for (String cluster : clusters.split(",")) {
+                        cluster = cluster.trim();
+                        if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+                            targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+                        }
+                    }
                 }
-            }
-            if (brokers.isEmpty()) {
-                throw new RuntimeException("Find none brokers for " + clusters);
-            }
-            for (String broker : brokers) {
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                if (addr == null) {
-                    throw new RuntimeException("Can't find addr for broker " + broker);
+                if (targetBrokers.isEmpty()) {
+                    throw new RuntimeException("Find none brokers, do nothing");
+                }
+                for (String broker : targetBrokers) {
+                    String addr = clientMetadata.findMasterBrokerAddr(broker);
+                    if (addr == null) {
+                        throw new RuntimeException("Can't find addr for broker " + broker);
+                    }
                 }
             }
 
             //get the existed topic config and mapping
-            String topic = commandLine.getOptionValue('t').trim();
             int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
             {
                 TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
@@ -223,7 +234,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
 
             //the check is ok, now do the mapping allocation
-            Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
+            Map<String, Integer> brokerNumMap = targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
             final Map<Integer, String> oldIdToBroker = new HashMap<>();
             globalIdMap.forEach((key, value) -> {
                 String leaderbroker = value.getBname();
@@ -277,13 +288,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 System.out.println("The new mapping data is written to file " + newMappingDataFile);
             }
 
-            //If some succeed, and others fail, it will cause inconsistent data
-            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
-                String broker = entry.getKey();
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                TopicConfigAndQueueMapping configMapping = entry.getValue();
-                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
-            }
+            doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt);
+
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
         } finally {