You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/25 12:24:25 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix the stability of remapping
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 2ee5897 Fix the stability of remapping
2ee5897 is described below
commit 2ee5897177e0f37dcbe1aa4985934c972a9d691d
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 25 20:23:59 2021 +0800
Fix the stability of remapping
---
.../statictopic/TopicQueueMappingDetail.java | 2 +
.../common/statictopic/TopicQueueMappingUtils.java | 70 ++++++++++++++---
.../common/statictopic/TopicMappingUtilsTest.java | 90 ++++++++++++++++++----
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 3 +-
.../command/topic/UpdateStaticTopicSubCommand.java | 2 +-
5 files changed, 139 insertions(+), 28 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 30db209..86a6cec 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -39,6 +39,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
super(topic, totalQueues, bname, epoch);
}
+
+
public static boolean putMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index ea6bc61..ef565a0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -39,12 +39,15 @@ public class TopicQueueMappingUtils {
public static class MappingAllocator {
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
Map<Integer, String> idToBroker = new HashMap<Integer, String>();
+ //used for remapping
+ Map<String, Integer> brokerNumMapBeforeRemapping = null;
int currentIndex = 0;
Random random = new Random();
List<String> leastBrokers = new ArrayList<String>();
- private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
+ private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap, Map<String, Integer> brokerNumMapBeforeRemapping) {
this.idToBroker.putAll(idToBroker);
this.brokerNumMap.putAll(brokerNumMap);
+ this.brokerNumMapBeforeRemapping = brokerNumMapBeforeRemapping;
}
private void freshState() {
@@ -58,7 +61,27 @@ public class TopicQueueMappingUtils {
leastBrokers.add(entry.getKey());
}
}
- currentIndex = random.nextInt(leastBrokers.size());
+ //reduce the remapping
+ if (brokerNumMapBeforeRemapping != null
+ && !brokerNumMapBeforeRemapping.isEmpty()) {
+ Collections.sort(leastBrokers, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ int i1 = 0, i2 = 0;
+ if (brokerNumMapBeforeRemapping.containsKey(o1)) {
+ i1 = brokerNumMapBeforeRemapping.get(o1);
+ }
+ if (brokerNumMapBeforeRemapping.containsKey(o2)) {
+ i2 = brokerNumMapBeforeRemapping.get(o2);
+ }
+ return i1 - i2;
+ }
+ });
+ } else {
+ //reduce the imbalance
+ Collections.shuffle(leastBrokers);
+ }
+ currentIndex = leastBrokers.size() - 1;
}
private String nextBroker() {
if (leastBrokers.isEmpty()) {
@@ -93,8 +116,9 @@ public class TopicQueueMappingUtils {
}
}
- public static MappingAllocator buildMappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
- return new MappingAllocator(idToBroker, brokerNumMap);
+
+ public static MappingAllocator buildMappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap, Map<String, Integer> brokerNumMapBeforeRemapping) {
+ return new MappingAllocator(idToBroker, brokerNumMap, brokerNumMapBeforeRemapping);
}
public static Map.Entry<Long, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
@@ -367,16 +391,28 @@ public class TopicQueueMappingUtils {
}
}
- public static void checkIfTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ public static void checkTargetBrokersComplete(Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
for (String broker : brokerConfigMap.keySet()) {
+ if (brokerConfigMap.get(broker).getMappingDetail().getHostedQueues().isEmpty()) {
+ continue;
+ }
if (!targetBrokers.contains(broker)) {
throw new RuntimeException("The existed broker " + broker + " dose not in target brokers ");
}
}
}
- public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
- checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
+ public static void checkNonTargetBrokers(Set<String> targetBrokers, Set<String> nonTargetBrokers) {
+ for (String broker : nonTargetBrokers) {
+ if (targetBrokers.contains(broker)) {
+ throw new RuntimeException("The non-target broker exist in target broker");
+ }
+ }
+ }
+
+ public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Set<String> nonTargetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
+ checkNonTargetBrokers(targetBrokers, nonTargetBrokers);
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
@@ -408,7 +444,7 @@ public class TopicQueueMappingUtils {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
}
- TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
+ TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap, null);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
@@ -436,6 +472,12 @@ public class TopicQueueMappingUtils {
TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
}
+ //set the non target brokers
+ for (String broker : nonTargetBrokers) {
+ if (!brokerConfigMap.containsKey(broker)) {
+ brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
+ }
+ }
// set the topic config
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
@@ -466,10 +508,20 @@ public class TopicQueueMappingUtils {
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
- TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
+ Map<String, Integer> brokerNumMapBeforeRemapping = new HashMap<String, Integer>();
+ for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
+ if(brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) {
+ brokerNumMapBeforeRemapping.put(mappingOne.bname, brokerNumMapBeforeRemapping.get(mappingOne.bname) + 1);
+ } else {
+ brokerNumMapBeforeRemapping.put(mappingOne.bname, 1);
+ }
+ }
+
+ TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, brokerNumMapBeforeRemapping);
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
+ //cannot directly use the idBrokerMap from allocator, for the number of globalId maybe not in the natural order
Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
index 1b0ad54..bd4b13c 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
@@ -10,6 +10,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
public class TopicMappingUtilsTest {
@@ -35,6 +36,18 @@ public class TopicMappingUtilsTest {
return map;
}
+ private Map<String, Integer> buildBrokerNumMap(int num, int queues) {
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ int random = new Random().nextInt(num);
+ for (int i = 0; i < num; i++) {
+ map.put("broker" + i, queues);
+ if (i == random) {
+ map.put("broker" + i, queues + 1);
+ }
+ }
+ return map;
+ }
+
private void testIdToBroker(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
Map<String, Integer> brokerNumOther = new HashMap<String, Integer>();
for (int i = 0; i < idToBroker.size(); i++) {
@@ -58,7 +71,7 @@ public class TopicMappingUtilsTest {
for (int i = 0; i < 10; i++) {
int num = 3;
Map<String, Integer> brokerNumMap = buildBrokerNumMap(num);
- TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
+ TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, null);
allocator.upToNum(num * 2);
for (Map.Entry<String, Integer> entry: allocator.getBrokerNumMap().entrySet()) {
Assert.assertEquals(2L, entry.getValue().longValue());
@@ -77,6 +90,18 @@ public class TopicMappingUtilsTest {
}
}
+ @Test
+ public void testRemappingAllocator() {
+ for (int i = 0; i < 10; i++) {
+ int num = (i + 2) * 2;
+ Map<String, Integer> brokerNumMap = buildBrokerNumMap(num);
+ Map<String, Integer> brokerNumMapBeforeRemapping = buildBrokerNumMap(num, num);
+ TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap, brokerNumMapBeforeRemapping);
+ allocator.upToNum(num * num + 1);
+ Assert.assertEquals(brokerNumMapBeforeRemapping, allocator.getBrokerNumMap());
+ }
+ }
+
@Test(expected = RuntimeException.class)
public void testTargetBrokersComplete() {
@@ -86,8 +111,10 @@ public class TopicMappingUtilsTest {
Set<String> targetBrokers = new HashSet<String>();
targetBrokers.add(broker1);
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
- brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0)));
- TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap);
+ TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker2, 0);
+ mappingDetail.getHostedQueues().put(1, new ArrayList<LogicQueueMappingItem>());
+ brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), mappingDetail));
+ TopicQueueMappingUtils.checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
}
@@ -99,28 +126,36 @@ public class TopicMappingUtilsTest {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
for (int i = 1; i < 10; i++) {
Set<String> targetBrokers = buildTargetBrokers(2 * i);
+ Set<String> nonTargetBrokers = buildTargetBrokers(2 * i, "test");
queueNum = 10 * i;
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
- Assert.assertEquals(2 * i, brokerConfigMap.size());
+ Assert.assertEquals(4 * i, brokerConfigMap.size());
//do the check manually
- TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
+ Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
+ Assert.assertEquals(queueNum, maxEpochAndNum.getValue().longValue());
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values());
TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
- Assert.assertEquals(5, configMapping.getReadQueueNums());
- Assert.assertEquals(5, configMapping.getWriteQueueNums());
- Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
- for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
- for (LogicQueueMappingItem item: items) {
- Assert.assertEquals(0, item.getStartOffset());
- Assert.assertEquals(0, item.getLogicOffset());
- TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
- Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
+ if (nonTargetBrokers.contains(configMapping.getMappingDetail().bname)) {
+ Assert.assertEquals(0, configMapping.getReadQueueNums());
+ Assert.assertEquals(0, configMapping.getWriteQueueNums());
+ Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size());
+ } else {
+ Assert.assertEquals(5, configMapping.getReadQueueNums());
+ Assert.assertEquals(5, configMapping.getWriteQueueNums());
+ Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis());
+ for (List<LogicQueueMappingItem> items: configMapping.getMappingDetail().getHostedQueues().values()) {
+ for (LogicQueueMappingItem item: items) {
+ Assert.assertEquals(0, item.getStartOffset());
+ Assert.assertEquals(0, item.getLogicOffset());
+ TopicConfig topicConfig = brokerConfigMap.get(item.getBname());
+ Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums());
+ }
}
}
}
@@ -133,7 +168,7 @@ public class TopicMappingUtilsTest {
int queueNum = 7;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2);
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
@@ -171,6 +206,27 @@ public class TopicMappingUtilsTest {
}
}
+ @Test
+ public void testRemappingStaticTopicStability() {
+ String topic = "static";
+ int queueNum = 7;
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+ Set<String> originalBrokers = buildTargetBrokers(2);
+ {
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
+ Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+ Assert.assertEquals(2, brokerConfigMap.size());
+ }
+ for (int i = 0; i < 10; i++) {
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, originalBrokers);
+ Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
+ Assert.assertEquals(2, brokerConfigMap.size());
+ Assert.assertTrue(wrapper.getBrokerToMapIn().isEmpty());
+ Assert.assertTrue(wrapper.getBrokerToMapOut().isEmpty());
+ }
+ }
+
+
@Test
public void testUtilsCheck() {
@@ -178,7 +234,7 @@ public class TopicMappingUtilsTest {
int queueNum = 10;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> targetBrokers = buildTargetBrokers(2);
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<String>(), brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index a5656f3..a4fa864 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -16,6 +16,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -45,7 +46,7 @@ public class StaticTopicIT extends BaseConf {
public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertTrue(brokerConfigMap.isEmpty());
- TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 85e2cc5..d6a680a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -217,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
//calculate the new data
- TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
{
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);