You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 09:34:10 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the code for command
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new f225fd0 Polish the code for command
f225fd0 is described below
commit f225fd05de44f5e46dc5da34a27fe0b1e947f0ef
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 17:31:41 2021 +0800
Polish the code for command
---
.../common/statictopic/TopicQueueMappingUtils.java | 17 +-
.../statictopic/TopicRemappingDetailWrapper.java | 25 ++
.../topic/RemappingStaticTopicSubCommand.java | 299 ++++++++++++---------
.../command/topic/UpdateStaticTopicSubCommand.java | 26 +-
4 files changed, 225 insertions(+), 142 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 9784199..545b7cf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -116,13 +116,12 @@ public class TopicQueueMappingUtils {
return detailList;
}
- public static Map.Entry<Long, Integer> validConsistenceOfTopicConfigAndQueueMapping(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ public static Map.Entry<Long, Integer> checkConsistenceOfTopicConfigAndQueueMapping(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
if (brokerConfigMap == null
|| brokerConfigMap.isEmpty()) {
return null;
}
//make sure it it not null
- String topic = null;
long maxEpoch = -1;
int maxNum = -1;
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
@@ -143,9 +142,7 @@ public class TopicQueueMappingUtils {
}
if (topic != null
&& !topic.equals(mappingDetail.getTopic())) {
- throw new RuntimeException("The topic name is inconsistent in broker " + broker);
- } else {
- topic = mappingDetail.getTopic();
+ throw new RuntimeException("The topic name is not match for broker " + broker);
}
if (maxEpoch != -1
@@ -165,7 +162,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
- public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
+ public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
@@ -216,10 +213,14 @@ public class TopicQueueMappingUtils {
return items.get(items.size() - 1);
}
- public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) {
+ public static String writeToTemp(TopicRemappingDetailWrapper wrapper, boolean after) {
String topic = wrapper.getTopic();
String data = wrapper.toJson();
- String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix;
+ String suffix = TopicRemappingDetailWrapper.SUFFIX_BEFORE;
+ if (after) {
+ suffix = TopicRemappingDetailWrapper.SUFFIX_AFTER;
+ }
+ String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + suffix;
try {
MixAll.string2File(data, fileName);
return fileName;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
index d229203..a10dba3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
@@ -3,12 +3,17 @@ package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class TopicRemappingDetailWrapper extends RemotingSerializable {
public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE";
public static final String TYPE_REMAPPING = "REMAPPING";
+ public static final String SUFFIX_BEFORE = ".before";
+ public static final String SUFFIX_AFTER = ".after";
+
private final String topic;
private final String type;
@@ -17,6 +22,10 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+ private Set<String> brokerToMapIn = new HashSet<String>();
+
+ private Set<String> brokerToMapOut = new HashSet<String>();
+
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
this.topic = topic;
this.type = type;
@@ -44,4 +53,20 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
return brokerConfigMap;
}
+
+ public Set<String> getBrokerToMapIn() {
+ return brokerToMapIn;
+ }
+
+ public void setBrokerToMapIn(Set<String> brokerToMapIn) {
+ this.brokerToMapIn = brokerToMapIn;
+ }
+
+ public Set<String> getBrokerToMapOut() {
+ return brokerToMapOut;
+ }
+
+ public void setBrokerToMapOut(Set<String> brokerToMapOut) {
+ this.brokerToMapOut = brokerToMapOut;
+ }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 472ce0b..656396c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -21,9 +21,9 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.admin.TopicOffset;
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -79,24 +80,129 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
+
+ opt = new Option("f", "mapFile", true, "The map file name");
+ opt.setRequired(false);
+ options.addOption(opt);
return options;
}
+ public void executeFromFile(final CommandLine commandLine, final Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ ClientMetadata clientMetadata = new ClientMetadata();
+
+ try {
+ String topic = commandLine.getOptionValue('t').trim();
+
+ String mapFileName = commandLine.getOptionValue('f').trim();
+ String mapData = MixAll.file2String(mapFileName);
+ TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
+ //double check the config
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
+ TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
+
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo == null
+ || clusterInfo.getClusterAddrTable().isEmpty()) {
+ throw new RuntimeException("The Cluster info is empty");
+ }
+ clientMetadata.refreshClusterInfo(clusterInfo);
+ doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
+ return;
+ }catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+
+
+ public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
+ ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+ // now do the remapping
+ //Step1: let the new leader can be write without the logicOffset
+ for (String broker: brokersToMapIn) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ }
+ //Step2: forbid the write of old leader
+ for (String broker: brokersToMapOut) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ }
+ //Step3: decide the logic offset
+ for (String broker: brokersToMapOut) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
+ TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
+ for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+ ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+ Integer globalId = entry.getKey();
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem newLeader = items.get(items.size() - 1);
+ LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
+ if (newLeader.getLogicOffset() > 0) {
+ continue;
+ }
+ TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
+ if (topicOffset == null) {
+ throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
+ }
+ //TODO check the max offset, will it return -1?
+ if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
+ throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
+ }
+ newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
+ TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
+ //fresh the new leader
+ mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
+ }
+ }
+ //Step4: write to the new leader with logic offset
+ for (String broker: brokersToMapIn) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ }
+ }
+
+
+
+
+
@Override
public void execute(final CommandLine commandLine, final Options options,
- RPCHook rpcHook) throws SubCommandException {
+ RPCHook rpcHook) throws SubCommandException {
+
+ if (!commandLine.hasOption('t')) {
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ return;
+ }
+
+ if (commandLine.hasOption("f")) {
+ executeFromFile(commandLine, options, rpcHook);
+ return;
+ }
+
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
- Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
- Set<String> brokers = new HashSet<>();
- Map.Entry<Long, Integer> maxEpochAndNum = null;
+ Set<String> targetBrokers = new HashSet<>();
+
try {
- if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
- || !commandLine.hasOption('t')) {
+ if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
@@ -108,84 +214,67 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
-
- if (commandLine.hasOption("b")) {
- String brokerStrs = commandLine.getOptionValue("b").trim();
- for (String broker: brokerStrs.split(",")) {
- brokers.add(broker.trim());
- }
- } else if (commandLine.hasOption("c")) {
- String clusters = commandLine.getOptionValue('c').trim();
- for (String cluster : clusters.split(",")) {
- cluster = cluster.trim();
- if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
- brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ {
+ if (commandLine.hasOption("b")) {
+ String brokerStrs = commandLine.getOptionValue("b").trim();
+ for (String broker: brokerStrs.split(",")) {
+ targetBrokers.add(broker.trim());
+ }
+ } else if (commandLine.hasOption("c")) {
+ String clusters = commandLine.getOptionValue('c').trim();
+ for (String cluster : clusters.split(",")) {
+ cluster = cluster.trim();
+ if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+ targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ }
}
}
- }
- if (brokers.isEmpty()) {
- throw new RuntimeException("Find none brokers");
- }
- for (String broker : brokers) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- if (addr == null) {
- throw new RuntimeException("Can't find addr for broker " + broker);
+ if (targetBrokers.isEmpty()) {
+ throw new RuntimeException("Find none brokers, do nothing");
+ }
+ for (String broker : targetBrokers) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ if (addr == null) {
+ throw new RuntimeException("Can't find addr for broker " + broker);
+ }
}
}
//get the existed topic config and mapping
- TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- clientMetadata.freshTopicRoute(topic, routeData);
- if (routeData != null
- && !routeData.getQueueDatas().isEmpty()) {
- for (QueueData queueData: routeData.getQueueDatas()) {
- String bname = queueData.getBrokerName();
- String addr = clientMetadata.findMasterBrokerAddr(bname);
- TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
- //allow the config is null
- if (mapping != null) {
- existedTopicConfigMap.put(bname, mapping);
+ {
+ TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ clientMetadata.freshTopicRoute(topic, routeData);
+ if (routeData != null
+ && !routeData.getQueueDatas().isEmpty()) {
+ for (QueueData queueData: routeData.getQueueDatas()) {
+ String bname = queueData.getBrokerName();
+ String addr = clientMetadata.findMasterBrokerAddr(bname);
+ TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ brokerConfigMap.put(bname, mapping);
+ }
}
}
}
- if (existedTopicConfigMap.isEmpty()) {
+ if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
- //make sure it it not null
- existedTopicConfigMap.forEach((key, value) -> {
- if (value.getMappingDetail() != null) {
- throw new RuntimeException("Mapping info should be null in broker " + key);
- }
- });
- //make sure the detail is not dirty
- existedTopicConfigMap.forEach((key, value) -> {
- if (!key.equals(value.getMappingDetail().getBname())) {
- throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
- }
- if (value.getMappingDetail().isDirty()) {
- throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
- }
- });
-
- List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
- //check the epoch and qnum
- maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- for (TopicQueueMappingDetail mappingDetail : detailList) {
- if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
- throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
- }
- if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
- throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
- }
- }
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
-
+ final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
long maxEpoch = maxEpochAndNum.getKey();
- TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
+
+ {
+ TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap);
+ String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
+ System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+ }
+
+ TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
@@ -213,6 +302,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
expectedIdToBroker.put(queueId, broker);
}
});
+ long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
//Now construct the remapping info
Set<String> brokersToMapOut = new HashSet<>();
@@ -230,8 +320,8 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
- TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker);
- TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker);
+ TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
+ TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
@@ -246,61 +336,24 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
- long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
- existedTopicConfigMap.values().forEach( configMapping -> {
- configMapping.getMappingDetail().setEpoch(epoch);
+ brokerConfigMap.values().forEach(configMapping -> {
+ configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
- TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
- // now do the remapping
- //Step1: let the new leader can be write without the logicOffset
- for (String broker: brokersToMapIn) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
- }
- //Step2: forbid the write of old leader
- for (String broker: brokersToMapOut) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
- }
- //Step3: decide the logic offset
- for (String broker: brokersToMapOut) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
- TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(broker);
- for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
- ImmutableList<LogicQueueMappingItem> items = entry.getValue();
- Integer globalId = entry.getKey();
- if (items.size() < 2) {
- continue;
- }
- LogicQueueMappingItem newLeader = items.get(items.size() - 1);
- LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
- if (newLeader.getLogicOffset() > 0) {
- continue;
- }
- TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
- if (topicOffset == null) {
- throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
- }
- //TODO check the max offset, will it return -1?
- if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
- throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
- }
- newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
- TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(newLeader.getBname());
- //fresh the new leader
- mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
- }
- }
- //Step4: write to the new leader with logic offset
- for (String broker: brokersToMapIn) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ //double check
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
+
+ {
+ TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap);
+ newWrapper.setBrokerToMapIn(brokersToMapIn);
+ newWrapper.setBrokerToMapOut(brokersToMapOut);
+ String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
+ System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
+
+ doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt);
+
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 21ce1cf..cdfdcfe 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
-import java.nio.charset.Charset;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
@@ -84,7 +83,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
- opt.setRequired(true);
+ opt.setRequired(false);
options.addOption(opt);
return options;
@@ -99,12 +98,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
try {
+ String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
- TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap());
- TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
+ TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
@@ -131,6 +131,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
+ if (!commandLine.hasOption('t')) {
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ return;
+ }
+
if (commandLine.hasOption("f")) {
executeFromFile(commandLine, options, rpcHook);
return;
@@ -159,7 +164,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
clientMetadata.refreshClusterInfo(clusterInfo);
-
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
@@ -200,8 +204,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
- maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+ maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -213,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
- String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before");
+ String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
@@ -264,12 +268,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
//double check the config
- TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
- TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+ TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
- String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after");
+ String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}