You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 04:00:49 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Init the remapping command
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 25a588b Init the remapping command
25a588b is described below
commit 25a588b81fd58dccb1e5f81e8a65859570a76a89
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 12:00:15 2021 +0800
Init the remapping command
---
.../rocketmq/common/TopicQueueMappingOne.java | 54 ++++++
.../rocketmq/common/TopicQueueMappingUtils.java | 6 +-
...nd.java => RemappingStaticTopicSubCommand.java} | 192 ++++++++++++---------
.../command/topic/UpdateStaticTopicSubCommand.java | 38 ++--
4 files changed, 190 insertions(+), 100 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java
new file mode 100644
index 0000000..150a208
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TopicQueueMappingOne extends RemotingSerializable {
+
+ String topic; // redundant field
+ String bname; //identify the hosted broker name
+ Integer globalId;
+ ImmutableList<LogicQueueMappingItem> items;
+
+ public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList<LogicQueueMappingItem> items) {
+ this.topic = topic;
+ this.bname = bname;
+ this.globalId = globalId;
+ this.items = items;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getBname() {
+ return bname;
+ }
+
+ public Integer getGlobalId() {
+ return globalId;
+ }
+
+ public ImmutableList<LogicQueueMappingItem> getItems() {
+ return items;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 686208a..ff89aaf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -103,7 +103,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
}
- public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
+ public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
@@ -111,7 +111,7 @@ public class TopicQueueMappingUtils {
}
});
- Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+ Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
@@ -125,7 +125,7 @@ public class TopicQueueMappingUtils {
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
}
} else {
- globalIdMap.put(globalid, entry.getValue());
+ globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue()));
}
}
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
similarity index 52%
copy from tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
copy to tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 29a7261..4cc5acf 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
-import com.sun.xml.internal.ws.api.BindingIDFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
@@ -26,6 +25,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -38,15 +38,17 @@ import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.AbstractMap;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
-public class UpdateStaticTopicSubCommand implements SubCommand {
+public class RemappingStaticTopicSubCommand implements SubCommand {
@Override
public String commandName() {
@@ -64,7 +66,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Option opt = null;
- opt = new Option("c", "clusterName", true, "create topic to which cluster");
+ opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated");
+ optionGroup.addOption(opt);
+
+ opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
@@ -73,11 +78,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
-
- opt = new Option("qn", "totalQueueNum", true, "total queue num");
- opt.setRequired(true);
- options.addOption(opt);
-
return options;
}
@@ -90,33 +90,46 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
- Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>();
+ Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+ Map.Entry<Long, Integer> maxEpochAndNum = null;
try {
- if (!commandLine.hasOption('t')
- || !commandLine.hasOption('c')
- || !commandLine.hasOption("qn")) {
+ if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
+ || !commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
String topic = commandLine.getOptionValue('t').trim();
- int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
- String clusters = commandLine.getOptionValue('c').trim();
+
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
- } else {
- clientMetadata.refreshClusterInfo(clusterInfo);
}
- Set<String> brokers = new HashSet<>();
- for (String cluster : clusters.split(",")) {
- cluster = cluster.trim();
- if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
- brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ clientMetadata.refreshClusterInfo(clusterInfo);
+
+ if (commandLine.hasOption("b")) {
+ String brokerStrs = commandLine.getOptionValue("b").trim();
+ for (String broker: brokerStrs.split(",")) {
+ brokers.add(broker.trim());
+ }
+ } else if (commandLine.hasOption("c")) {
+ String clusters = commandLine.getOptionValue('c').trim();
+ for (String cluster : clusters.split(",")) {
+ cluster = cluster.trim();
+ if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+ brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ }
}
}
if (brokers.isEmpty()) {
- throw new RuntimeException("Find none brokers for " + clusters);
+ throw new RuntimeException("Find none brokers");
+ }
+ for (String broker : brokers) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ if (addr == null) {
+ throw new RuntimeException("Can't find addr for broker " + broker);
+ }
}
//get the existed topic config and mapping
@@ -135,93 +148,106 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
}
- Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
- if (!existedTopicConfigMap.isEmpty()) {
- //make sure it it not null
- existedTopicConfigMap.forEach((key, value) -> {
- if (value.getMappingDetail() != null) {
- throw new RuntimeException("Mapping info should be null in broker " + key);
- }
- });
- //make sure the detail is not dirty
- existedTopicConfigMap.forEach((key, value) -> {
- if (!key.equals(value.getMappingDetail().getBname())) {
- throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
- }
- if (value.getMappingDetail().isDirty()) {
- throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
- }
- });
-
- List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
- //check the epoch and qnum
- maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
- detailList.forEach( mappingDetail -> {
- if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
- throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
- }
- if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
- throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
- }
- });
+ if (existedTopicConfigMap.isEmpty()) {
+ throw new RuntimeException("No topic route to do the remapping");
+ }
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
+ //make sure it it not null
+ existedTopicConfigMap.forEach((key, value) -> {
+ if (value.getMappingDetail() != null) {
+ throw new RuntimeException("Mapping info should be null in broker " + key);
+ }
+ });
+ //make sure the detail is not dirty
+ existedTopicConfigMap.forEach((key, value) -> {
+ if (!key.equals(value.getMappingDetail().getBname())) {
+ throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
+ }
+ if (value.getMappingDetail().isDirty()) {
+ throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
+ }
+ });
- if (maxEpochAndNum.getValue() != globalIdMap.size()) {
- throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
+ List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
+ //check the epoch and qnum
+ maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
+ final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
+ detailList.forEach( mappingDetail -> {
+ if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
+ throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
- for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
- if (!globalIdMap.containsKey(i)) {
- throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
- }
+ if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
+ throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
+ });
+
+ globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
+
+ if (maxEpochAndNum.getValue() != globalIdMap.size()) {
+ throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
}
- if (queueNum < globalIdMap.size()) {
- throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
- }
- //check the queue number
- if (queueNum == globalIdMap.size()) {
- throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
+ for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
+ if (!globalIdMap.containsKey(i)) {
+ throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
+ }
}
+
//the check is ok, now do the mapping allocation
- Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
- Map<Integer, String> idToBroker = new HashMap<>();
- globalIdMap.forEach((key, value) -> {
- String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value);
- idToBroker.put(key, leaderbroker);
- if (!brokerNumMap.containsKey(leaderbroker)) {
- brokerNumMap.put(leaderbroker, 1);
+ int maxNum = maxEpochAndNum.getValue();
+ TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
+ allocator.upToNum(maxNum);
+ Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
+ Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
+ Map<Integer, String> expectedIdToBroker = new HashMap<>();
+ //the following logic will make sure that, for one broker, only "take in" or "take out" queues
+ //It can't, take in some queues but alse take out some queues.
+ globalIdMap.forEach((queueId, mappingOne) -> {
+ String leaderBroker = mappingOne.getBname();
+ if (expectedBrokerNumMap.containsKey(leaderBroker)) {
+ if (expectedBrokerNumMap.get(leaderBroker) > 0) {
+ expectedIdToBroker.put(queueId, leaderBroker);
+ expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
+ } else {
+ waitAssignQueues.add(queueId);
+ expectedBrokerNumMap.remove(leaderBroker);
+ }
} else {
- brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
+ waitAssignQueues.add(queueId);
}
});
- TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
- allocator.upToNum(queueNum);
- Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
+ expectedBrokerNumMap.forEach((broker, queueNum) -> {
+ for (int i = 0; i < queueNum; i++) {
+ expectedIdToBroker.put(waitAssignQueues.poll(), broker);
+ }
+ });
+
+ Set<Broker>
+
+ //Now construct the remapping info
//construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
- for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
+ for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet()) {
Integer queueId = e.getKey();
- String value = e.getValue();
+ String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(value)) {
+ if (!existedTopicConfigMap.containsKey(broker)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
+ TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+ existedTopicConfigMap.put(broker, configMapping);
} else {
- configMapping = existedTopicConfigMap.get(value);
+ configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
- configMapping.getMappingDetail().setTotalQueues(queueNum);
+ configMapping.getMappingDetail().setTotalQueues(0);
}
- LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
+ LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 29a7261..a1ff0b0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -64,7 +65,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Option opt = null;
- opt = new Option("c", "clusterName", true, "create topic to which cluster");
+ opt = new Option("c", "clusters", true, "create topic to clusters, comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
@@ -90,7 +91,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
- Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>();
+ Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+
try {
if (!commandLine.hasOption('t')
|| !commandLine.hasOption('c')
@@ -108,7 +111,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} else {
clientMetadata.refreshClusterInfo(clusterInfo);
}
- Set<String> brokers = new HashSet<>();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
@@ -118,6 +120,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers for " + clusters);
}
+ for (String broker : brokers) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ if (addr == null) {
+ throw new RuntimeException("Can't find addr for broker " + broker);
+ }
+ }
//get the existed topic config and mapping
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
@@ -188,7 +196,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
Map<Integer, String> idToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
- String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value);
+ String leaderbroker = value.getBname();
idToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
@@ -204,27 +212,29 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
- String value = e.getValue();
+ String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(value)) {
- TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
- configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+ if (!existedTopicConfigMap.containsKey(broker)) {
+ configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
+ configMapping.setWriteQueueNums(1);
+ configMapping.setReadQueueNums(1);
+ existedTopicConfigMap.put(broker, configMapping);
} else {
- configMapping = existedTopicConfigMap.get(value);
+ configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
- configMapping.getMappingDetail().setEpoch(epoch);
- configMapping.getMappingDetail().setTotalQueues(queueNum);
}
- LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
+ LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
-
+ existedTopicConfigMap.values().forEach( configMapping -> {
+ configMapping.getMappingDetail().setEpoch(epoch);
+ configMapping.getMappingDetail().setTotalQueues(queueNum);
+ });
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();