You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/25 07:12:17 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Test utils for static topic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 1dacb66  Test utils for static topic
1dacb66 is described below

commit 1dacb667e082c90b80bd4d6c6f9c9bfd6942b0a7
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 25 15:10:50 2021 +0800

    Test utils for static topic
---
 .../common/statictopic/TopicQueueMappingUtils.java | 66 ++++++++++++++++++++--
 .../common/statictopic/TopicMappingUtilsTest.java  | 63 +++++++++++++++++++++
 2 files changed, 123 insertions(+), 6 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 5f843a9..e8c02a7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -234,6 +234,43 @@ public class TopicQueueMappingUtils {
         }
     }
 
+    public static void  checkIfReusePhysicalQueue(Collection<TopicQueueMappingOne> mappingOnes) {
+        Map<String, TopicQueueMappingOne>  physicalQueueIdMap = new HashMap<String, TopicQueueMappingOne>();
+        for (TopicQueueMappingOne mappingOne : mappingOnes) {
+            for (LogicQueueMappingItem item: mappingOne.items) {
+                String physicalQueueId = item.getBname() + "-" + item.getQueueId();
+                if (physicalQueueIdMap.containsKey(physicalQueueId)) {
+                    throw new RuntimeException(String.format("Topic %s global queue id %d and %d shared the same physical queue %s",
+                            mappingOne.topic, mappingOne.globalId, physicalQueueIdMap.get(physicalQueueId).globalId, physicalQueueId));
+                } else {
+                    physicalQueueIdMap.put(physicalQueueId, mappingOne);
+                }
+            }
+        }
+    }
+
+    public static void  checkPhysicalQueueConsistence(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            TopicConfigAndQueueMapping configMapping = entry.getValue();
+            assert configMapping != null;
+            assert configMapping.getMappingDetail() != null;
+            if (configMapping.getReadQueueNums() < configMapping.getWriteQueueNums()) {
+                throw new RuntimeException("Read queues is smaller than write queues");
+            }
+            for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
+                for (LogicQueueMappingItem item: items) {
+                    if (item.getStartOffset() != 0) {
+                        throw new RuntimeException("The start offset dose not begin from 0");
+                    }
+                    TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+                    if (item.getQueueId() >= topicConfig.getWriteQueueNums()) {
+                        throw new RuntimeException("The physical queue id is overflow the write queues");
+                    }
+                }
+            }
+        }
+    }
+
     public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
         Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
             @Override
@@ -275,6 +312,7 @@ public class TopicQueueMappingUtils {
                 }
             }
         }
+        checkIfReusePhysicalQueue(globalIdMap.values());
         return globalIdMap;
     }
 
@@ -312,12 +350,23 @@ public class TopicQueueMappingUtils {
         }
     }
 
+    public static void checkIfTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+        for (String broker : brokerConfigMap.keySet()) {
+            if (!targetBrokers.contains(broker)) {
+                throw new RuntimeException("The existed broker " + broker + " dose not in target brokers ");
+            }
+        }
+    }
+
     public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+        checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
         Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
         if (!brokerConfigMap.isEmpty()) {
             maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
             globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+            checkIfReusePhysicalQueue(globalIdMap.values());
+            checkPhysicalQueueConsistence(brokerConfigMap);
         }
         if (queueNum < globalIdMap.size()) {
             throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -377,9 +426,12 @@ public class TopicQueueMappingUtils {
             configMapping.getMappingDetail().setTotalQueues(queueNum);
         }
         //double check the config
-        TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
-        TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
-
+        {
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+            globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+            checkIfReusePhysicalQueue(globalIdMap.values());
+            checkPhysicalQueueConsistence(brokerConfigMap);
+        }
         return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet<String>(), new HashSet<String>());
     }
 
@@ -454,7 +506,7 @@ public class TopicQueueMappingUtils {
 
             List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems());
             LogicQueueMappingItem last = items.get(items.size() - 1);
-            items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
+            items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1));
 
             ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
             //Use the same object
@@ -469,8 +521,10 @@ public class TopicQueueMappingUtils {
         }
 
         //double check
-        TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
-        TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+        {
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        }
+
 
         return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
     }
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
index 6a61de5..c82a690 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
@@ -1,14 +1,28 @@
 package org.apache.rocketmq.common.statictopic;
 
+import org.apache.rocketmq.common.TopicConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class TopicMappingUtilsTest {
 
 
+
+    private Set<String> buildTargetBrokers(int num) {
+        Set<String> brokers = new HashSet<String>();
+        for (int i = 0; i < num; i++) {
+            brokers.add("broker" + i);
+        }
+        return brokers;
+    }
+
     private Map<String, Integer> buildBrokerNumMap(int num) {
         Map<String, Integer> map = new HashMap<String, Integer>();
         for (int i = 0; i < num; i++) {
@@ -58,4 +72,53 @@ public class TopicMappingUtilsTest {
             testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap());
         }
     }
+
+
+    @Test(expected = RuntimeException.class)
+    public void testTargetBrokersComplete() {
+        String topic = "static";
+        String broker1 = "broker1";
+        String broker2 = "broker2";
+        Set<String> targetBrokers = new HashSet<String>();
+        targetBrokers.add(broker1);
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+        brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0)));
+        TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
+    }
+
+
+
+    @Test
+    public void testCreateStaticTopic() {
+        String topic = "static";
+        int queueNum;
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+        for (int i = 1; i < 10; i++) {
+            Set<String> targetBrokers = buildTargetBrokers(2 * i);
+            queueNum = 10 * i;
+            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+            Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+            Assert.assertEquals(2 * i, brokerConfigMap.size());
+
+            //do the check manually
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+            Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+            TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
+            TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+
+            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+                TopicConfigAndQueueMapping configMapping = entry.getValue();
+                Assert.assertEquals(5, configMapping.getReadQueueNums());
+                Assert.assertEquals(5, configMapping.getWriteQueueNums());
+                for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
+                    for (LogicQueueMappingItem item: items) {
+                        Assert.assertEquals(0, item.getStartOffset());
+                        Assert.assertEquals(0, item.getLogicOffset());
+                        TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+                        Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
+                    }
+                }
+            }
+        }
+    }
 }