You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/25 07:12:17 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Test utils for static topic
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 1dacb66 Test utils for static topic
1dacb66 is described below
commit 1dacb667e082c90b80bd4d6c6f9c9bfd6942b0a7
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 25 15:10:50 2021 +0800
Test utils for static topic
---
.../common/statictopic/TopicQueueMappingUtils.java | 66 ++++++++++++++++++++--
.../common/statictopic/TopicMappingUtilsTest.java | 63 +++++++++++++++++++++
2 files changed, 123 insertions(+), 6 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 5f843a9..e8c02a7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -234,6 +234,43 @@ public class TopicQueueMappingUtils {
}
}
+ public static void checkIfReusePhysicalQueue(Collection<TopicQueueMappingOne> mappingOnes) {
+ Map<String, TopicQueueMappingOne> physicalQueueIdMap = new HashMap<String, TopicQueueMappingOne>();
+ for (TopicQueueMappingOne mappingOne : mappingOnes) {
+ for (LogicQueueMappingItem item: mappingOne.items) {
+ String physicalQueueId = item.getBname() + "-" + item.getQueueId();
+ if (physicalQueueIdMap.containsKey(physicalQueueId)) {
+ throw new RuntimeException(String.format("Topic %s global queue id %d and %d shared the same physical queue %s",
+ mappingOne.topic, mappingOne.globalId, physicalQueueIdMap.get(physicalQueueId).globalId, physicalQueueId));
+ } else {
+ physicalQueueIdMap.put(physicalQueueId, mappingOne);
+ }
+ }
+ }
+ }
+
+ public static void checkPhysicalQueueConsistence(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ assert configMapping != null;
+ assert configMapping.getMappingDetail() != null;
+ if (configMapping.getReadQueueNums() < configMapping.getWriteQueueNums()) {
+ throw new RuntimeException("Read queues is smaller than write queues");
+ }
+ for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
+ for (LogicQueueMappingItem item: items) {
+ if (item.getStartOffset() != 0) {
+ throw new RuntimeException("The start offset dose not begin from 0");
+ }
+ TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+ if (item.getQueueId() >= topicConfig.getWriteQueueNums()) {
+ throw new RuntimeException("The physical queue id is overflow the write queues");
+ }
+ }
+ }
+ }
+ }
+
public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
@@ -275,6 +312,7 @@ public class TopicQueueMappingUtils {
}
}
}
+ checkIfReusePhysicalQueue(globalIdMap.values());
return globalIdMap;
}
@@ -312,12 +350,23 @@ public class TopicQueueMappingUtils {
}
}
+ public static void checkIfTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ for (String broker : brokerConfigMap.keySet()) {
+ if (!targetBrokers.contains(broker)) {
+ throw new RuntimeException("The existed broker " + broker + " dose not in target brokers ");
+ }
+ }
+ }
+
public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+ checkIfReusePhysicalQueue(globalIdMap.values());
+ checkPhysicalQueueConsistence(brokerConfigMap);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -377,9 +426,12 @@ public class TopicQueueMappingUtils {
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
//double check the config
- TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
- TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
-
+ {
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+ checkIfReusePhysicalQueue(globalIdMap.values());
+ checkPhysicalQueueConsistence(brokerConfigMap);
+ }
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet<String>(), new HashSet<String>());
}
@@ -454,7 +506,7 @@ public class TopicQueueMappingUtils {
List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
- items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
+ items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
@@ -469,8 +521,10 @@ public class TopicQueueMappingUtils {
}
//double check
- TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
- TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+ {
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ }
+
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
index 6a61de5..c82a690 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
@@ -1,14 +1,28 @@
package org.apache.rocketmq.common.statictopic;
+import org.apache.rocketmq.common.TopicConfig;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
public class TopicMappingUtilsTest {
+
+ private Set<String> buildTargetBrokers(int num) {
+ Set<String> brokers = new HashSet<String>();
+ for (int i = 0; i < num; i++) {
+ brokers.add("broker" + i);
+ }
+ return brokers;
+ }
+
private Map<String, Integer> buildBrokerNumMap(int num) {
Map<String, Integer> map = new HashMap<String, Integer>();
for (int i = 0; i < num; i++) {
@@ -58,4 +72,53 @@ public class TopicMappingUtilsTest {
testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap());
}
}
+
+
+ @Test(expected = RuntimeException.class)
+ public void testTargetBrokersComplete() {
+ String topic = "static";
+ String broker1 = "broker1";
+ String broker2 = "broker2";
+ Set<String> targetBrokers = new HashSet<String>();
+ targetBrokers.add(broker1);
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+ brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0)));
+ TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
+ }
+
+
+
+ @Test
+ public void testCreateStaticTopic() {
+ String topic = "static";
+ int queueNum;
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+ for (int i = 1; i < 10; i++) {
+ Set<String> targetBrokers = buildTargetBrokers(2 * i);
+ queueNum = 10 * i;
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+ Assert.assertEquals(2 * i, brokerConfigMap.size());
+
+ //do the check manually
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+ TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
+ TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ Assert.assertEquals(5, configMapping.getReadQueueNums());
+ Assert.assertEquals(5, configMapping.getWriteQueueNums());
+ for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
+ for (LogicQueueMappingItem item: items) {
+ Assert.assertEquals(0, item.getStartOffset());
+ Assert.assertEquals(0, item.getLogicOffset());
+ TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+ Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
+ }
+ }
+ }
+ }
+ }
}