You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/25 11:26:09 UTC
[rocketmq] 01/02: Finish the test for utils of createStaticTopic
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a172c769b838e721233eed26ee59e2b688787f38
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 25 15:45:03 2021 +0800
Finish the test for utils of createStaticTopic
---
.../common/statictopic/TopicQueueMappingUtils.java | 12 ++--
.../common/statictopic/TopicMappingUtilsTest.java | 75 ++++++++++++++++++++++
2 files changed, 82 insertions(+), 5 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e8c02a7..d1d81c6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -263,6 +263,9 @@ public class TopicQueueMappingUtils {
throw new RuntimeException("The start offset dose not begin from 0");
}
TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+ if (topicConfig == null) {
+ throw new RuntimeException("The broker dose not exist");
+ }
if (item.getQueueId() >= topicConfig.getWriteQueueNums()) {
throw new RuntimeException("The physical queue id is overflow the write queues");
}
@@ -406,7 +409,7 @@ public class TopicQueueMappingUtils {
}
TopicConfigAndQueueMapping configMapping;
if (!brokerConfigMap.containsKey(broker)) {
- configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
+ configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, System.currentTimeMillis()));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
@@ -416,7 +419,7 @@ public class TopicQueueMappingUtils {
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
- TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, ImmutableList.of(mappingItem));
+ TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
}
// set the topic config
@@ -508,10 +511,9 @@ public class TopicQueueMappingUtils {
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1));
- ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
- TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, resultItems);
- TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, resultItems);
+ TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, items);
+ TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, items);
}
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
index c82a690..14e4c0f 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
@@ -5,6 +5,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -110,6 +111,7 @@ public class TopicMappingUtilsTest {
TopicConfigAndQueueMapping configMapping = entry.getValue();
Assert.assertEquals(5, configMapping.getReadQueueNums());
Assert.assertEquals(5, configMapping.getWriteQueueNums());
+ Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
for (LogicQueueMappingItem item: items) {
Assert.assertEquals(0, item.getStartOffset());
@@ -121,4 +123,77 @@ public class TopicMappingUtilsTest {
}
}
}
+
+ @Test
+ public void testCreateStaticTopic_Error() {
+ String topic = "static";
+ int queueNum = 10;
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+ Set<String> targetBrokers = buildTargetBrokers(2);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+ Assert.assertEquals(2, brokerConfigMap.size());
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
+ List<LogicQueueMappingItem> items = configMapping.getMappingDetail().getHostedQueues().values().iterator().next();
+ Map.Entry<Long, Integer> maxEpochNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ int exceptionNum = 0;
+ try {
+ configMapping.getMappingDetail().setTopic("xxxx");
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ } catch (RuntimeException ignore) {
+ exceptionNum++;
+ configMapping.getMappingDetail().setTopic(topic);
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ }
+
+ try {
+ configMapping.getMappingDetail().setTotalQueues(1);
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ } catch (RuntimeException ignore) {
+ exceptionNum++;
+ configMapping.getMappingDetail().setTotalQueues(10);
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ }
+
+ try {
+ configMapping.getMappingDetail().setEpoch(0);
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ } catch (RuntimeException ignore) {
+ exceptionNum++;
+ configMapping.getMappingDetail().setEpoch(maxEpochNum.getKey());
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ }
+
+
+ try {
+ configMapping.getMappingDetail().getHostedQueues().put(10000, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1))));
+ TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+ } catch (RuntimeException ignore) {
+ exceptionNum++;
+ configMapping.getMappingDetail().getHostedQueues().remove(10000);
+ TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+ }
+
+ try {
+ configMapping.setWriteQueueNums(1);
+ TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+ } catch (RuntimeException ignore) {
+ exceptionNum++;
+ configMapping.setWriteQueueNums(5);
+ TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+ }
+
+ try {
+ items.add(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1));
+ Map<Integer, TopicQueueMappingOne> map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+ TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values());
+ } catch (RuntimeException ignore) {
+ exceptionNum++;
+ items.remove(items.size() - 1);
+ Map<Integer, TopicQueueMappingOne> map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+ TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values());
+ }
+ Assert.assertEquals(6, exceptionNum);
+
+ }
}