You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 06:37:25 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the remapping logic
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new e9cafe9 Polish the remapping logic
e9cafe9 is described below
commit e9cafe9fef748b92ea4d020e16bb95192ffaadff
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 14:36:47 2021 +0800
Polish the remapping logic
---
.../topic/RemappingStaticTopicSubCommand.java | 63 +++++++++++++---------
.../command/topic/UpdateStaticTopicSubCommand.java | 2 +-
2 files changed, 38 insertions(+), 27 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 4cc5acf..34aec17 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -194,6 +194,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
+ long maxEpoch = maxEpochAndNum.getKey();
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
@@ -217,40 +218,50 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
});
expectedBrokerNumMap.forEach((broker, queueNum) -> {
for (int i = 0; i < queueNum; i++) {
- expectedIdToBroker.put(waitAssignQueues.poll(), broker);
+ Integer queueId = waitAssignQueues.poll();
+ assert queueId != null;
+ expectedIdToBroker.put(queueId, broker);
}
});
- Set<Broker>
-
//Now construct the remapping info
-
- //construct the topic configAndMapping
- long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
- for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet()) {
- Integer queueId = e.getKey();
- String broker = e.getValue();
- if (globalIdMap.containsKey(queueId)) {
- //ignore the exited
+ Set<String> brokersToMapOut = new HashSet<>();
+ Set<String> brokersToMapIn = new HashSet<>();
+ for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
+ Integer queueId = mapEntry.getKey();
+ String broker = mapEntry.getValue();
+ TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
+ assert topicQueueMappingOne != null;
+ if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
- TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(broker)) {
- TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch);
- configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
- existedTopicConfigMap.put(broker, configMapping);
- } else {
- configMapping = existedTopicConfigMap.get(broker);
- configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
- configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
- configMapping.getMappingDetail().setEpoch(epoch);
- configMapping.getMappingDetail().setTotalQueues(0);
- }
- LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
- configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
+ //remapping
+ String mapInBroker = broker;
+ String mapOutBroker = topicQueueMappingOne.getBname();
+ brokersToMapIn.add(mapInBroker);
+ brokersToMapOut.add(mapOutBroker);
+ TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker);
+ TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker);
+
+ mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
+ mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
+
+ List<LogicQueueMappingItem> items = new ArrayList<>(topicQueueMappingOne.getItems());
+ LogicQueueMappingItem last = items.get(items.size() - 1);
+ items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
+
+ ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
+ mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
+ mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
+ long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
+ existedTopicConfigMap.values().forEach( configMapping -> {
+ configMapping.getMappingDetail().setEpoch(epoch);
+ configMapping.getMappingDetail().setTotalQueues(maxNum);
+ });
+ //decide the new offset
+
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a1ff0b0..278ea9f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -226,7 +226,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} else {
configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
- configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+ configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));