You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 07:14:15 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the remapping command

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new cabc383  Finish the remapping command
cabc383 is described below

commit cabc3838ff68792498631da9e133c711167ff922
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 15:13:41 2021 +0800

    Finish the remapping command
---
 .../rocketmq/common/LogicQueueMappingItem.java     | 18 +++++++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  6 +++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  9 +++-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  3 ++
 .../topic/RemappingStaticTopicSubCommand.java      | 61 +++++++++++++++++++---
 5 files changed, 88 insertions(+), 9 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index ad0ed96..e05a4d6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -88,4 +88,22 @@ public class LogicQueueMappingItem {
     public long getTimeOfEnd() {
         return timeOfEnd;
     }
+
+    public void setLogicOffset(long logicOffset) {
+        this.logicOffset = logicOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "LogicQueueMappingItem{" +
+                "gen=" + gen +
+                ", queueId=" + queueId +
+                ", bname='" + bname + '\'' +
+                ", logicOffset=" + logicOffset +
+                ", startOffset=" + startOffset +
+                ", endOffset=" + endOffset +
+                ", timeOfStart=" + timeOfStart +
+                ", timeOfEnd=" + timeOfEnd +
+                '}';
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 52eecff..b80f57d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -228,6 +228,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        return defaultMQAdminExtImpl.examineTopicStats(brokerAddr, topic);
+    }
+
+    @Override
     public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
         return this.defaultMQAdminExtImpl.fetchAllTopicList();
     }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 4ddf161..5a2bd2d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -274,6 +274,13 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return topicStatsTable;
     }
 
+
+    @Override
+    public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis);
+    }
+
     @Override
     public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
@@ -1092,7 +1099,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
         this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
     }
-    
+
     @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 15b97db..c4d02f1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -108,6 +108,9 @@ public interface MQAdminExt extends MQAdmin {
         final String topic) throws RemotingException, MQClientException, InterruptedException,
         MQBrokerException;
 
+    TopicStatsTable examineTopicStats(String brokerAddr, final String topic) throws RemotingException, MQClientException, InterruptedException,
+            MQBrokerException;
+
     TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
 
     TopicList fetchTopicsByCLuster(
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 34aec17..69f7422 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.tools.command.topic;
 
+import com.alibaba.fastjson.JSON;
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -27,6 +28,9 @@ import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.TopicQueueMappingOne;
 import org.apache.rocketmq.common.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -200,8 +204,8 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
             Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
             Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
             Map<Integer, String> expectedIdToBroker = new HashMap<>();
-            //the following logic will make sure that, for one broker, only "take in" or "take out" queues
-            //It can't, take in some queues but alse take out some queues.
+            //the following logic will make sure that, for one broker, either "map in" or "map out"
+            //It can't both,  map in some queues but also map out some queues.
             globalIdMap.forEach((queueId, mappingOne) -> {
                 String leaderBroker = mappingOne.getBname();
                 if (expectedBrokerNumMap.containsKey(leaderBroker)) {
@@ -251,6 +255,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
 
                 ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
+                //Use the same object
                 mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
                 mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
             }
@@ -260,13 +265,53 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 configMapping.getMappingDetail().setEpoch(epoch);
                 configMapping.getMappingDetail().setTotalQueues(maxNum);
             });
-            //decide the new offset
-
-            //If some succeed, and others fail, it will cause inconsistent data
-            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
-                String broker = entry.getKey();
+            // now do the remapping
+            //Step1: let the new leader can be write without the logicOffset
+            for (String broker: brokersToMapIn) {
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
+                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            }
+            //Step2: forbid the write of old leader
+            for (String broker: brokersToMapOut) {
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
+                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            }
+            //Step3: decide the logic offset
+            for (String broker: brokersToMapOut) {
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
+                TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(broker);
+                for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+                    ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+                    Integer globalId = entry.getKey();
+                    if (items.size() < 2) {
+                        continue;
+                    }
+                    LogicQueueMappingItem newLeader = items.get(items.size() - 1);
+                    LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
+                    if (newLeader.getLogicOffset()  > 0) {
+                        continue;
+                    }
+                    TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
+                    if (topicOffset == null) {
+                        throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
+                    }
+                    //TODO check the max offset, will it return -1?
+                    if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
+                        throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
+                    }
+                    newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
+                    TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(newLeader.getBname());
+                    //fresh the new leader
+                    mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
+                }
+            }
+            //Step4: write to the new leader with logic offset
+            for (String broker: brokersToMapIn) {
                 String addr = clientMetadata.findMasterBrokerAddr(broker);
-                TopicConfigAndQueueMapping configMapping = entry.getValue();
+                TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
                 defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
             }
         } catch (Exception e) {