You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 07:14:15 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the remapping command
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new cabc383 Finish the remapping command
cabc383 is described below
commit cabc3838ff68792498631da9e133c711167ff922
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 15:13:41 2021 +0800
Finish the remapping command
---
.../rocketmq/common/LogicQueueMappingItem.java | 18 +++++++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 6 +++
.../tools/admin/DefaultMQAdminExtImpl.java | 9 +++-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 3 ++
.../topic/RemappingStaticTopicSubCommand.java | 61 +++++++++++++++++++---
5 files changed, 88 insertions(+), 9 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index ad0ed96..e05a4d6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -88,4 +88,22 @@ public class LogicQueueMappingItem {
public long getTimeOfEnd() {
return timeOfEnd;
}
+
+ public void setLogicOffset(long logicOffset) {
+ this.logicOffset = logicOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "LogicQueueMappingItem{" +
+ "gen=" + gen +
+ ", queueId=" + queueId +
+ ", bname='" + bname + '\'' +
+ ", logicOffset=" + logicOffset +
+ ", startOffset=" + startOffset +
+ ", endOffset=" + endOffset +
+ ", timeOfStart=" + timeOfStart +
+ ", timeOfEnd=" + timeOfEnd +
+ '}';
+ }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 52eecff..b80f57d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -228,6 +228,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
+ public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ return defaultMQAdminExtImpl.examineTopicStats(brokerAddr, topic);
+ }
+
+ @Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
return this.defaultMQAdminExtImpl.fetchAllTopicList();
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 4ddf161..5a2bd2d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -274,6 +274,13 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return topicStatsTable;
}
+
+ @Override
+ public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis);
+ }
+
@Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
@@ -1092,7 +1099,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
}
-
+
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 15b97db..c4d02f1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -108,6 +108,9 @@ public interface MQAdminExt extends MQAdmin {
final String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
+ TopicStatsTable examineTopicStats(String brokerAddr, final String topic) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException;
+
TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
TopicList fetchTopicsByCLuster(
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 34aec17..69f7422 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -27,6 +28,9 @@ import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -200,8 +204,8 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<>();
- //the following logic will make sure that, for one broker, only "take in" or "take out" queues
- //It can't, take in some queues but alse take out some queues.
+ //the following logic will make sure that, for one broker, either "map in" or "map out"
+ //It can't both, map in some queues but also map out some queues.
globalIdMap.forEach((queueId, mappingOne) -> {
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
@@ -251,6 +255,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
+ //Use the same object
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
@@ -260,13 +265,53 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
- //decide the new offset
-
- //If some succeed, and others fail, it will cause inconsistent data
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
- String broker = entry.getKey();
+ // now do the remapping
+ //Step1: let the new leader can be write without the logicOffset
+ for (String broker: brokersToMapIn) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ }
+ //Step2: forbid the write of old leader
+ for (String broker: brokersToMapOut) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ }
+ //Step3: decide the logic offset
+ for (String broker: brokersToMapOut) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
+ TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(broker);
+ for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+ ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+ Integer globalId = entry.getKey();
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem newLeader = items.get(items.size() - 1);
+ LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
+ if (newLeader.getLogicOffset() > 0) {
+ continue;
+ }
+ TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
+ if (topicOffset == null) {
+ throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
+ }
+ //TODO check the max offset, will it return -1?
+ if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
+ throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
+ }
+ newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
+ TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(newLeader.getBname());
+ //fresh the new leader
+ mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
+ }
+ }
+ //Step4: write to the new leader with logic offset
+ for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = entry.getValue();
+ TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
} catch (Exception e) {