You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/25 11:26:09 UTC

[rocketmq] 01/02: Finish the test for utils of createStaticTopic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit a172c769b838e721233eed26ee59e2b688787f38
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 25 15:45:03 2021 +0800

    Finish the test for utils of createStaticTopic
---
 .../common/statictopic/TopicQueueMappingUtils.java | 12 ++--
 .../common/statictopic/TopicMappingUtilsTest.java  | 75 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 5 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e8c02a7..d1d81c6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -263,6 +263,9 @@ public class TopicQueueMappingUtils {
                         throw new RuntimeException("The start offset dose not begin from 0");
                     }
                     TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+                    if (topicConfig == null) {
+                        throw new RuntimeException("The broker dose not exist");
+                    }
                     if (item.getQueueId() >= topicConfig.getWriteQueueNums()) {
                         throw new RuntimeException("The physical queue id is overflow the write queues");
                     }
@@ -406,7 +409,7 @@ public class TopicQueueMappingUtils {
             }
             TopicConfigAndQueueMapping configMapping;
             if (!brokerConfigMap.containsKey(broker)) {
-                configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
+                configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, System.currentTimeMillis()));
                 configMapping.setWriteQueueNums(1);
                 configMapping.setReadQueueNums(1);
                 brokerConfigMap.put(broker, configMapping);
@@ -416,7 +419,7 @@ public class TopicQueueMappingUtils {
                 configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
             }
             LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
-            TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, ImmutableList.of(mappingItem));
+            TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
         }
 
         // set the topic config
@@ -508,10 +511,9 @@ public class TopicQueueMappingUtils {
             LogicQueueMappingItem last = items.get(items.size() - 1);
             items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1));
 
-            ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
             //Use the same object
-            TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, resultItems);
-            TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, resultItems);
+            TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, items);
+            TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, items);
         }
 
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
index c82a690..14e4c0f 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
@@ -5,6 +5,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -110,6 +111,7 @@ public class TopicMappingUtilsTest {
                 TopicConfigAndQueueMapping configMapping = entry.getValue();
                 Assert.assertEquals(5, configMapping.getReadQueueNums());
                 Assert.assertEquals(5, configMapping.getWriteQueueNums());
+                Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
                 for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
                     for (LogicQueueMappingItem item: items) {
                         Assert.assertEquals(0, item.getStartOffset());
@@ -121,4 +123,77 @@ public class TopicMappingUtilsTest {
             }
         }
     }
+
+    @Test
+    public void testCreateStaticTopic_Error() {
+        String topic = "static";
+        int queueNum = 10;
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+        Set<String> targetBrokers = buildTargetBrokers(2);
+        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+        Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+        Assert.assertEquals(2, brokerConfigMap.size());
+        TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
+        List<LogicQueueMappingItem> items = configMapping.getMappingDetail().getHostedQueues().values().iterator().next();
+        Map.Entry<Long, Integer> maxEpochNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        int exceptionNum = 0;
+        try {
+            configMapping.getMappingDetail().setTopic("xxxx");
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        } catch (RuntimeException ignore) {
+            exceptionNum++;
+            configMapping.getMappingDetail().setTopic(topic);
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        }
+
+        try {
+            configMapping.getMappingDetail().setTotalQueues(1);
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        } catch (RuntimeException ignore) {
+            exceptionNum++;
+            configMapping.getMappingDetail().setTotalQueues(10);
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        }
+
+        try {
+            configMapping.getMappingDetail().setEpoch(0);
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        } catch (RuntimeException ignore) {
+            exceptionNum++;
+            configMapping.getMappingDetail().setEpoch(maxEpochNum.getKey());
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        }
+
+
+        try {
+            configMapping.getMappingDetail().getHostedQueues().put(10000, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1))));
+            TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+        } catch (RuntimeException ignore) {
+            exceptionNum++;
+            configMapping.getMappingDetail().getHostedQueues().remove(10000);
+            TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+        }
+
+        try {
+            configMapping.setWriteQueueNums(1);
+            TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+        } catch (RuntimeException ignore) {
+            exceptionNum++;
+            configMapping.setWriteQueueNums(5);
+            TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+        }
+
+        try {
+            items.add(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1));
+            Map<Integer, TopicQueueMappingOne> map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+            TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values());
+        } catch (RuntimeException ignore) {
+            exceptionNum++;
+            items.remove(items.size() - 1);
+            Map<Integer, TopicQueueMappingOne> map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+            TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values());
+        }
+        Assert.assertEquals(6, exceptionNum);
+
+    }
 }