You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 08:57:45 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Enable to run from file for createStaticTopic command
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 165e133 Enable to run from file for createStaticTopic command
165e133 is described below
commit 165e133a64ae34fa992223f3472f6398d3c9cb2a
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 16:57:19 2021 +0800
Enable to run from file for createStaticTopic command
---
.../protocol/body/TopicConfigSerializeWrapper.java | 6 +-
.../common/statictopic/TopicQueueMappingUtils.java | 77 ++++++++++
.../statictopic/TopicRemappingDetailWrapper.java | 47 +++++++
.../{ => statictopic}/TopicQueueMappingTest.java | 12 +-
.../command/topic/UpdateStaticTopicSubCommand.java | 156 +++++++++++++--------
5 files changed, 236 insertions(+), 62 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index 1176e6b..a71c50a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -17,13 +17,13 @@
package org.apache.rocketmq.common.protocol.body;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index dfb6bbf..9784199 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -17,9 +17,12 @@
package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
+import org.apache.rocketmq.common.MixAll;
+import java.io.File;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -103,6 +106,65 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
}
+ public static List<TopicQueueMappingDetail> getMappingDetailFromConfig(Collection<TopicConfigAndQueueMapping> configs) {
+ List<TopicQueueMappingDetail> detailList = new ArrayList<TopicQueueMappingDetail>();
+ for (TopicConfigAndQueueMapping configMapping : configs) {
+ if (configMapping.getMappingDetail() != null) {
+ detailList.add(configMapping.getMappingDetail());
+ }
+ }
+ return detailList;
+ }
+
+ public static Map.Entry<Long, Integer> validConsistenceOfTopicConfigAndQueueMapping(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ if (brokerConfigMap == null
+ || brokerConfigMap.isEmpty()) {
+ return null;
+ }
+ //make sure it it not null
+ String topic = null;
+ long maxEpoch = -1;
+ int maxNum = -1;
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+ String broker = entry.getKey();
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ if (configMapping.getMappingDetail() == null) {
+ throw new RuntimeException("Mapping info should not be null in broker " + broker);
+ }
+ TopicQueueMappingDetail mappingDetail = configMapping.getMappingDetail();
+ if (!broker.equals(mappingDetail.getBname())) {
+ throw new RuntimeException(String.format("The broker name is not equal %s != %s ", broker, mappingDetail.getBname()));
+ }
+ if (mappingDetail.isDirty()) {
+ throw new RuntimeException("The mapping info is dirty in broker " + broker);
+ }
+ if (!configMapping.getTopicName().equals(mappingDetail.getTopic())) {
+ throw new RuntimeException("The topic name is inconsistent in broker " + broker);
+ }
+ if (topic != null
+ && !topic.equals(mappingDetail.getTopic())) {
+ throw new RuntimeException("The topic name is inconsistent in broker " + broker);
+ } else {
+ topic = mappingDetail.getTopic();
+ }
+
+ if (maxEpoch != -1
+ && maxEpoch != mappingDetail.getEpoch()) {
+ throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpoch, mappingDetail.getEpoch(), mappingDetail.getBname()));
+ } else {
+ maxEpoch = mappingDetail.getEpoch();
+ }
+
+ if (maxNum != -1
+ && maxNum != mappingDetail.getTotalQueues()) {
+ throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxNum, mappingDetail.getTotalQueues(), mappingDetail.getBname()));
+ } else {
+ maxNum = mappingDetail.getTotalQueues();
+ }
+ }
+ return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
+ }
+
public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
@@ -153,4 +215,19 @@ public class TopicQueueMappingUtils {
assert items.size() > 0;
return items.get(items.size() - 1);
}
+
+ public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) {
+ String topic = wrapper.getTopic();
+ String data = wrapper.toJson();
+ String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix;
+ try {
+ MixAll.string2File(data, fileName);
+ return fileName;
+ } catch (Exception e) {
+ throw new RuntimeException("write file failed " + fileName,e);
+ }
+ }
+
+
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
new file mode 100644
index 0000000..d229203
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.common.statictopic;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TopicRemappingDetailWrapper extends RemotingSerializable {
+ public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE";
+ public static final String TYPE_REMAPPING = "REMAPPING";
+
+
+ private final String topic;
+ private final String type;
+ private final long epoch;
+ private Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
+
+ private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+
+ public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ this.topic = topic;
+ this.type = type;
+ this.epoch = epoch;
+ this.expectedIdToBroker = expectedIdToBroker;
+ this.brokerConfigMap = brokerConfigMap;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public long getEpoch() {
+ return epoch;
+ }
+
+ public Map<Integer, String> getExpectedIdToBroker() {
+ return expectedIdToBroker;
+ }
+
+ public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
+ return brokerConfigMap;
+ }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
similarity index 88%
rename from common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
rename to common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index e6e3528..a1b3d27 100644
--- a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.common;
+package org.apache.rocketmq.common.statictopic;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
@@ -9,11 +9,19 @@ import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
import java.util.Map;
public class TopicQueueMappingTest {
@Test
+ public void testWriteToFile() {
+ System.out.println(System.getProperty("java.io.tmpdir"));
+ System.out.println(File.separator);
+ }
+
+
+ @Test
public void testJsonSerialize() {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
String mappingItemJson = JSON.toJSONString(mappingItem) ;
@@ -30,7 +38,7 @@ public class TopicQueueMappingTest {
Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
- TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01");
+ TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
String mappingDetailJson = JSON.toJSONString(mappingDetail);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 65c7377..21ce1cf 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
@@ -31,17 +32,18 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
+import java.nio.charset.Charset;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -81,18 +83,64 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
+ opt = new Option("f", "mapFile", true, "The map file name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
return options;
}
+
+ public void executeFromFile(final CommandLine commandLine, final Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ ClientMetadata clientMetadata = new ClientMetadata();
+
+ try {
+ String mapFileName = commandLine.getOptionValue('f').trim();
+ String mapData = MixAll.file2String(mapFileName);
+ TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
+ //double check the config
+ TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap());
+ TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
+
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo == null
+ || clusterInfo.getClusterAddrTable().isEmpty()) {
+ throw new RuntimeException("The Cluster info is empty");
+ }
+ clientMetadata.refreshClusterInfo(clusterInfo);
+ //If some succeed, and others fail, it will cause inconsistent data
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : wrapper.getBrokerConfigMap().entrySet()) {
+ String broker = entry.getKey();
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ }
+ return;
+ }catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
+ if (commandLine.hasOption("f")) {
+ executeFromFile(commandLine, options, rpcHook);
+ return;
+ }
+
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-
ClientMetadata clientMetadata = new ClientMetadata();
- Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
+
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
@@ -111,8 +159,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
clientMetadata.refreshClusterInfo(clusterInfo);
- String topic = commandLine.getOptionValue('t').trim();
- int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
+
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
@@ -131,51 +178,30 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
//get the existed topic config and mapping
- TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- clientMetadata.freshTopicRoute(topic, routeData);
- if (routeData != null
- && !routeData.getQueueDatas().isEmpty()) {
- for (QueueData queueData: routeData.getQueueDatas()) {
- String bname = queueData.getBrokerName();
- String addr = clientMetadata.findMasterBrokerAddr(bname);
- TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
- //allow the config is null
- if (mapping != null) {
- existedTopicConfigMap.put(bname, mapping);
+ String topic = commandLine.getOptionValue('t').trim();
+ int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
+ {
+ TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ clientMetadata.freshTopicRoute(topic, routeData);
+
+ if (routeData != null
+ && !routeData.getQueueDatas().isEmpty()) {
+ for (QueueData queueData: routeData.getQueueDatas()) {
+ String bname = queueData.getBrokerName();
+ String addr = clientMetadata.findMasterBrokerAddr(bname);
+ TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ brokerConfigMap.put(bname, mapping);
+ }
}
}
}
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
- if (!existedTopicConfigMap.isEmpty()) {
- //make sure it it not null
- existedTopicConfigMap.forEach((key, value) -> {
- if (value.getMappingDetail() != null) {
- throw new RuntimeException("Mapping info should be null in broker " + key);
- }
- });
- //make sure the detail is not dirty
- existedTopicConfigMap.forEach((key, value) -> {
- if (!key.equals(value.getMappingDetail().getBname())) {
- throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
- }
- if (value.getMappingDetail().isDirty()) {
- throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
- }
- });
-
- List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
- //check the epoch and qnum
- maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- for (TopicQueueMappingDetail mappingDetail : detailList) {
- if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
- throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
- }
- if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
- throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
- }
- }
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
+ if (!brokerConfigMap.isEmpty()) {
+ maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
+ globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -184,24 +210,32 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
+
+ {
+ TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
+ String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before");
+ System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+ }
+
+
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
- Map<Integer, String> idToBroker = new HashMap<>();
+ final Map<Integer, String> oldIdToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = value.getBname();
- idToBroker.put(key, leaderbroker);
+ oldIdToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
});
- TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
+ TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
- long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
+ long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
@@ -210,13 +244,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
continue;
}
TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(broker)) {
+ if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
- existedTopicConfigMap.put(broker, configMapping);
+ brokerConfigMap.put(broker, configMapping);
} else {
- configMapping = existedTopicConfigMap.get(broker);
+ configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
@@ -224,15 +258,23 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
- //double check the topic config map
- existedTopicConfigMap.values().forEach( configMapping -> {
- configMapping.getMappingDetail().setEpoch(epoch);
+ // set the topic config
+ brokerConfigMap.values().forEach(configMapping -> {
+ configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
- TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
+ //double check the config
+ TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
+ TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+
+ {
+ TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
+ String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after");
+ System.out.println("The new mapping data is written to file " + newMappingDataFile);
+ }
//If some succeed, and others fail, it will cause inconsistent data
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();