You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 02:16:16 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Ignore the existed queueId in static topi creation
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 45946d4 Ignore the existed queueId in static topi creation
45946d4 is described below
commit 45946d475697f0b78e4d3b31105f75bb76e7ee19
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 10:15:44 2021 +0800
Ignore the existed queueId in static topi creation
---
.../command/topic/UpdateStaticTopicSubCommand.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 5cee8c8..29a7261 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
+import com.sun.xml.internal.ws.api.BindingIDFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
@@ -201,22 +202,28 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
//construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
- newIdToBroker.forEach( (queueId, broker) -> {
+ for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
+ Integer queueId = e.getKey();
+ String value = e.getValue();
+ if (globalIdMap.containsKey(queueId)) {
+ //ignore the exited
+ continue;
+ }
TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(broker)) {
+ if (!existedTopicConfigMap.containsKey(value)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, broker, epoch);
+ TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
} else {
- configMapping = existedTopicConfigMap.get(broker);
+ configMapping = existedTopicConfigMap.get(value);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
- LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
+ LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
- });
+ }
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {