You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/25 12:24:25 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix the stability of remapping

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 2ee5897  Fix the stability of remapping
2ee5897 is described below

commit 2ee5897177e0f37dcbe1aa4985934c972a9d691d
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 25 20:23:59 2021 +0800

    Fix the stability of remapping
---
 .../statictopic/TopicQueueMappingDetail.java       |  2 +
 .../common/statictopic/TopicQueueMappingUtils.java | 70 ++++++++++++++---
 .../common/statictopic/TopicMappingUtilsTest.java  | 90 ++++++++++++++++++----
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  |  3 +-
 .../command/topic/UpdateStaticTopicSubCommand.java |  2 +-
 5 files changed, 139 insertions(+), 28 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 30db209..86a6cec 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -39,6 +39,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         super(topic, totalQueues, bname, epoch);
     }
 
+
+
     public static boolean putMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
         if (mappingInfo.isEmpty()) {
             return true;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index ea6bc61..ef565a0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -39,12 +39,15 @@ public class TopicQueueMappingUtils {
     public static class MappingAllocator {
         Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
         Map<Integer, String> idToBroker = new HashMap<Integer, String>();
+        //used for remapping
+        Map<String, Integer> brokerNumMapBeforeRemapping = null;
         int currentIndex = 0;
         Random random = new Random();
         List<String> leastBrokers = new ArrayList<String>();
-        private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
+        private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap, Map<String, Integer> brokerNumMapBeforeRemapping) {
             this.idToBroker.putAll(idToBroker);
             this.brokerNumMap.putAll(brokerNumMap);
+            this.brokerNumMapBeforeRemapping = brokerNumMapBeforeRemapping;
         }
 
         private void freshState() {
@@ -58,7 +61,27 @@ public class TopicQueueMappingUtils {
                     leastBrokers.add(entry.getKey());
                 }
             }
-            currentIndex = random.nextInt(leastBrokers.size());
+            //reduce the remapping
+            if (brokerNumMapBeforeRemapping != null
+                    && !brokerNumMapBeforeRemapping.isEmpty()) {
+                Collections.sort(leastBrokers, new Comparator<String>() {
+                    @Override
+                    public int compare(String o1, String o2) {
+                        int i1 = 0, i2 = 0;
+                        if (brokerNumMapBeforeRemapping.containsKey(o1)) {
+                            i1 = brokerNumMapBeforeRemapping.get(o1);
+                        }
+                        if (brokerNumMapBeforeRemapping.containsKey(o2)) {
+                            i2 = brokerNumMapBeforeRemapping.get(o2);
+                        }
+                        return i1 - i2;
+                    }
+                });
+            } else {
+                //reduce the imbalance
+                Collections.shuffle(leastBrokers);
+            }
+            currentIndex = leastBrokers.size() - 1;
         }
         private String nextBroker() {
             if (leastBrokers.isEmpty()) {
@@ -93,8 +116,9 @@ public class TopicQueueMappingUtils {
         }
     }
 
-    public static MappingAllocator buildMappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
-        return new MappingAllocator(idToBroker, brokerNumMap);
+
+    public static MappingAllocator buildMappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap, Map<String, Integer> brokerNumMapBeforeRemapping) {
+        return new MappingAllocator(idToBroker, brokerNumMap, brokerNumMapBeforeRemapping);
     }
 
     public static Map.Entry<Long, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
@@ -367,16 +391,28 @@ public class TopicQueueMappingUtils {
         }
     }
 
-    public static void checkIfTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+    public static void checkTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
         for (String broker : brokerConfigMap.keySet()) {
+            if (brokerConfigMap.get(broker).getMappingDetail().getHostedQueues().isEmpty()) {
+                continue;
+            }
             if (!targetBrokers.contains(broker)) {
                 throw new RuntimeException("The existed broker " + broker + " dose not in target brokers ");
             }
         }
     }
 
-    public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
-        checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
+    public static void checkNonTargetBrokers(Set<String> targetBrokers, Set<String> nonTargetBrokers) {
+        for (String broker : nonTargetBrokers) {
+            if (targetBrokers.contains(broker)) {
+                throw new RuntimeException("The non-target broker exist in target broker");
+            }
+        }
+    }
+
+    public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Set<String> nonTargetBrokers,  Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+        checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
+        checkNonTargetBrokers(targetBrokers, nonTargetBrokers);
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
         Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
         if (!brokerConfigMap.isEmpty()) {
@@ -408,7 +444,7 @@ public class TopicQueueMappingUtils {
                 brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
             }
         }
-        TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
+        TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap, null);
         allocator.upToNum(queueNum);
         Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
 
@@ -436,6 +472,12 @@ public class TopicQueueMappingUtils {
             TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
         }
 
+        //set the non target brokers
+        for (String broker : nonTargetBrokers) {
+            if (!brokerConfigMap.containsKey(broker)) {
+                brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
+            }
+        }
         // set the topic config
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
             TopicConfigAndQueueMapping configMapping = entry.getValue();
@@ -466,10 +508,20 @@ public class TopicQueueMappingUtils {
         for (String broker: targetBrokers) {
             brokerNumMap.put(broker, 0);
         }
-        TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
+        Map<String, Integer> brokerNumMapBeforeRemapping = new HashMap<String, Integer>();
+        for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
+            if(brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) {
+                brokerNumMapBeforeRemapping.put(mappingOne.bname, brokerNumMapBeforeRemapping.get(mappingOne.bname) + 1);
+            } else {
+                brokerNumMapBeforeRemapping.put(mappingOne.bname, 1);
+            }
+        }
+
+        TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, brokerNumMapBeforeRemapping);
         allocator.upToNum(maxNum);
         Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
         Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
+        //cannot directly use the idBrokerMap from allocator, for the number of globalId maybe not in the natural order
         Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
         //the following logic will make sure that, for one broker, either "map in" or "map out"
         //It can't both,  map in some queues but also map out some queues.
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
index 1b0ad54..bd4b13c 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
@@ -10,6 +10,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 public class TopicMappingUtilsTest {
@@ -35,6 +36,18 @@ public class TopicMappingUtilsTest {
         return map;
     }
 
+    private Map<String, Integer> buildBrokerNumMap(int num, int queues) {
+        Map<String, Integer> map = new HashMap<String, Integer>();
+        int random = new Random().nextInt(num);
+        for (int i = 0; i < num; i++) {
+            map.put("broker" + i, queues);
+            if (i == random) {
+                map.put("broker" + i, queues + 1);
+            }
+        }
+        return map;
+    }
+
     private void testIdToBroker(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
         Map<String, Integer> brokerNumOther = new HashMap<String, Integer>();
         for (int i = 0; i < idToBroker.size(); i++) {
@@ -58,7 +71,7 @@ public class TopicMappingUtilsTest {
         for (int i = 0; i < 10; i++) {
             int num = 3;
             Map<String, Integer> brokerNumMap = buildBrokerNumMap(num);
-            TopicQueueMappingUtils.MappingAllocator  allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
+            TopicQueueMappingUtils.MappingAllocator  allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap,  null);
             allocator.upToNum(num * 2);
             for (Map.Entry<String, Integer> entry: allocator.getBrokerNumMap().entrySet()) {
                 Assert.assertEquals(2L, entry.getValue().longValue());
@@ -77,6 +90,18 @@ public class TopicMappingUtilsTest {
         }
     }
 
+    @Test
+    public void testRemappingAllocator() {
+        for (int i = 0; i < 10; i++) {
+            int num = (i + 2) * 2;
+            Map<String, Integer> brokerNumMap = buildBrokerNumMap(num);
+            Map<String, Integer> brokerNumMapBeforeRemapping = buildBrokerNumMap(num, num);
+            TopicQueueMappingUtils.MappingAllocator  allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, brokerNumMapBeforeRemapping);
+            allocator.upToNum(num * num + 1);
+            Assert.assertEquals(brokerNumMapBeforeRemapping, allocator.getBrokerNumMap());
+        }
+    }
+
 
     @Test(expected = RuntimeException.class)
     public void testTargetBrokersComplete() {
@@ -86,8 +111,10 @@ public class TopicMappingUtilsTest {
         Set<String> targetBrokers = new HashSet<String>();
         targetBrokers.add(broker1);
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
-        brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0)));
-        TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
+        TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker2, 0);
+        mappingDetail.getHostedQueues().put(1, new ArrayList<LogicQueueMappingItem>());
+        brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), mappingDetail));
+        TopicQueueMappingUtils.checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
     }
 
 
@@ -99,28 +126,36 @@ public class TopicMappingUtilsTest {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
         for (int i = 1; i < 10; i++) {
             Set<String> targetBrokers = buildTargetBrokers(2 * i);
+            Set<String> nonTargetBrokers = buildTargetBrokers(2 * i, "test");
             queueNum = 10 * i;
-            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap);
             Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
-            Assert.assertEquals(2 * i, brokerConfigMap.size());
+            Assert.assertEquals(4 * i, brokerConfigMap.size());
 
             //do the check manually
-            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
+            Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
+            Assert.assertEquals(queueNum, maxEpochAndNum.getValue().longValue());
             Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
             TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
             TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
 
             for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
                 TopicConfigAndQueueMapping configMapping = entry.getValue();
-                Assert.assertEquals(5, configMapping.getReadQueueNums());
-                Assert.assertEquals(5, configMapping.getWriteQueueNums());
-                Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
-                for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
-                    for (LogicQueueMappingItem item: items) {
-                        Assert.assertEquals(0, item.getStartOffset());
-                        Assert.assertEquals(0, item.getLogicOffset());
-                        TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
-                        Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
+                if (nonTargetBrokers.contains(configMapping.getMappingDetail().bname)) {
+                    Assert.assertEquals(0, configMapping.getReadQueueNums());
+                    Assert.assertEquals(0, configMapping.getWriteQueueNums());
+                    Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size());
+                } else {
+                    Assert.assertEquals(5, configMapping.getReadQueueNums());
+                    Assert.assertEquals(5, configMapping.getWriteQueueNums());
+                    Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
+                    for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
+                        for (LogicQueueMappingItem item: items) {
+                            Assert.assertEquals(0, item.getStartOffset());
+                            Assert.assertEquals(0, item.getLogicOffset());
+                            TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+                            Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
+                        }
                     }
                 }
             }
@@ -133,7 +168,7 @@ public class TopicMappingUtilsTest {
         int queueNum = 7;
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
         Set<String>  originalBrokers = buildTargetBrokers(2);
-        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
+        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
         Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
         Assert.assertEquals(2, brokerConfigMap.size());
 
@@ -171,6 +206,27 @@ public class TopicMappingUtilsTest {
         }
     }
 
+    @Test
+    public void testRemappingStaticTopicStability() {
+        String topic = "static";
+        int queueNum = 7;
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+        Set<String>  originalBrokers = buildTargetBrokers(2);
+        {
+            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
+            Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+            Assert.assertEquals(2, brokerConfigMap.size());
+        }
+        for (int i = 0; i < 10; i++) {
+            TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, originalBrokers);
+            Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+            Assert.assertEquals(2, brokerConfigMap.size());
+            Assert.assertTrue(wrapper.getBrokerToMapIn().isEmpty());
+            Assert.assertTrue(wrapper.getBrokerToMapOut().isEmpty());
+        }
+    }
+
+
 
     @Test
     public void testUtilsCheck() {
@@ -178,7 +234,7 @@ public class TopicMappingUtilsTest {
         int queueNum = 10;
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
         Set<String> targetBrokers = buildTargetBrokers(2);
-        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<String>(), brokerConfigMap);
         Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
         Assert.assertEquals(2, brokerConfigMap.size());
         TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index a5656f3..a4fa864 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -16,6 +16,7 @@ import org.junit.FixMethodOrder;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -45,7 +46,7 @@ public class StaticTopicIT extends BaseConf {
     public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
         Assert.assertTrue(brokerConfigMap.isEmpty());
-        TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+        TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
         Assert.assertEquals(2, brokerConfigMap.size());
         //If some succeed, and others fail, it will cause inconsistent data
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 85e2cc5..d6a680a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -217,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             }
 
             //calculate the new data
-            TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+            TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
 
             {
                 String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);