You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 06:37:25 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the remapping logic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new e9cafe9  Polish the remapping logic
e9cafe9 is described below

commit e9cafe9fef748b92ea4d020e16bb95192ffaadff
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 14:36:47 2021 +0800

    Polish the remapping logic
---
 .../topic/RemappingStaticTopicSubCommand.java      | 63 +++++++++++++---------
 .../command/topic/UpdateStaticTopicSubCommand.java |  2 +-
 2 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 4cc5acf..34aec17 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -194,6 +194,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
 
             //the check is ok, now do the mapping allocation
             int maxNum = maxEpochAndNum.getValue();
+            long maxEpoch = maxEpochAndNum.getKey();
             TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
             allocator.upToNum(maxNum);
             Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
@@ -217,40 +218,50 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
             });
             expectedBrokerNumMap.forEach((broker, queueNum) -> {
                 for (int i = 0; i < queueNum; i++) {
-                    expectedIdToBroker.put(waitAssignQueues.poll(), broker);
+                    Integer queueId = waitAssignQueues.poll();
+                    assert queueId != null;
+                    expectedIdToBroker.put(queueId, broker);
                 }
             });
 
-            Set<Broker>
-
             //Now construct the remapping info
-
-            //construct the topic configAndMapping
-            long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
-            for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet()) {
-                Integer queueId = e.getKey();
-                String broker = e.getValue();
-                if (globalIdMap.containsKey(queueId)) {
-                    //ignore the exited
+            Set<String> brokersToMapOut = new HashSet<>();
+            Set<String> brokersToMapIn = new HashSet<>();
+            for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
+                Integer queueId = mapEntry.getKey();
+                String broker = mapEntry.getValue();
+                TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
+                assert topicQueueMappingOne != null;
+                if (topicQueueMappingOne.getBname().equals(broker)) {
                     continue;
                 }
-                TopicConfigAndQueueMapping configMapping;
-                if (!existedTopicConfigMap.containsKey(broker)) {
-                    TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
-                    TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch);
-                    configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
-                    existedTopicConfigMap.put(broker, configMapping);
-                } else {
-                    configMapping = existedTopicConfigMap.get(broker);
-                    configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
-                    configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
-                    configMapping.getMappingDetail().setEpoch(epoch);
-                    configMapping.getMappingDetail().setTotalQueues(0);
-                }
-                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
-                configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
+                //remapping
+                String mapInBroker = broker;
+                String mapOutBroker = topicQueueMappingOne.getBname();
+                brokersToMapIn.add(mapInBroker);
+                brokersToMapOut.add(mapOutBroker);
+                TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker);
+                TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker);
+
+                mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
+                mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
+
+                List<LogicQueueMappingItem> items = new ArrayList<>(topicQueueMappingOne.getItems());
+                LogicQueueMappingItem last = items.get(items.size() - 1);
+                items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
+
+                ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
+                mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
+                mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
             }
 
+            long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
+            existedTopicConfigMap.values().forEach( configMapping -> {
+                configMapping.getMappingDetail().setEpoch(epoch);
+                configMapping.getMappingDetail().setTotalQueues(maxNum);
+            });
+            //decide the new offset
+
             //If some succeed, and others fail, it will cause inconsistent data
             for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
                 String broker = entry.getKey();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a1ff0b0..278ea9f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -226,7 +226,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 } else {
                     configMapping = existedTopicConfigMap.get(broker);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
-                    configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+                    configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
                 }
                 LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
                 configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));