You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/20 05:20:54 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the use of route data

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new daf4749  Polish the use of route data
daf4749 is described below

commit daf47490c3e26720762534462c41a77b15e97ea2
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 20 13:20:39 2021 +0800

    Polish the use of route data
---
 .../apache/rocketmq/common/rpc/ClientMetadata.java | 48 ++++++++++++++--------
 .../common/statictopic/LogicQueueMappingItem.java  | 16 ++++----
 .../statictopic/TopicQueueMappingDetail.java       | 13 +-----
 .../common/statictopic/TopicQueueMappingInfo.java  |  7 ----
 .../namesrv/routeinfo/RouteInfoManager.java        |  1 +
 5 files changed, 43 insertions(+), 42 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 38f1d51..53ffa51 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -10,8 +10,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -93,33 +96,46 @@ public class ClientMetadata {
                 || route.getTopicQueueMappingByBroker().isEmpty()) {
             return new ConcurrentHashMap<MessageQueue, String>();
         }
-        ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
-
-        int totalNums = 0;
-        for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
-            String brokerName = entry.getKey();
-            //TODO check the epoch of
-            if (entry.getValue().getTotalQueues() > totalNums) {
-                if (totalNums != 0) {
-                    log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
-                }
-                totalNums = entry.getValue().getTotalQueues();
+        ConcurrentMap<MessageQueue, TopicQueueMappingInfo> mqEndPoints = new ConcurrentHashMap<MessageQueue, TopicQueueMappingInfo>();
+
+
+        List<Map.Entry<String, TopicQueueMappingInfo>> mappingInfos = new ArrayList<Map.Entry<String, TopicQueueMappingInfo>>(route.getTopicQueueMappingByBroker().entrySet());
+        Collections.sort(mappingInfos, new Comparator<Map.Entry<String, TopicQueueMappingInfo>>() {
+            @Override
+            public int compare(Map.Entry<String, TopicQueueMappingInfo> o1, Map.Entry<String, TopicQueueMappingInfo> o2) {
+                return  (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch());
+            }
+        });
+
+        int maxTotalNums = 0;
+        long maxTotalNumOfEpoch = -1;
+        for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
+            TopicQueueMappingInfo info = entry.getValue();
+            if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
+                maxTotalNums = entry.getValue().getTotalQueues();
             }
             for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
                 int globalId = idEntry.getKey();
                 MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
-                String oldBrokerName = mqEndPoints.put(mq, brokerName);
-                log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
+                TopicQueueMappingInfo oldInfo = mqEndPoints.get(mq);
+                if (oldInfo == null ||  oldInfo.getEpoch() <= info.getEpoch()) {
+                    mqEndPoints.put(mq, info);
+                }
             }
         }
+
+        ConcurrentMap<MessageQueue, String> mqEndPointsOfBroker = new ConcurrentHashMap<MessageQueue, String>();
+
         //accomplish the static logic queues
-        for (int i = 0; i < totalNums; i++) {
+        for (int i = 0; i < maxTotalNums; i++) {
             MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
             if (!mqEndPoints.containsKey(mq)) {
-                mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+                mqEndPointsOfBroker.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+            } else {
+                mqEndPointsOfBroker.put(mq, mqEndPoints.get(mq).getBname());
             }
         }
-        return mqEndPoints;
+        return mqEndPointsOfBroker;
     }
 
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index c855dfd..479f75d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -2,14 +2,14 @@ package org.apache.rocketmq.common.statictopic;
 
 public class LogicQueueMappingItem {
 
-    private int gen; //generation, mutable
-    private int queueId;
-    private String bname;
-    private long logicOffset; // the start of the logic offset
-    private long startOffset; // the start of the physical offset, included
-    private long endOffset = -1; // the end of the physical offset, excluded
-    private long timeOfStart = -1; // mutable
-    private long timeOfEnd = -1; // mutable
+    private final int gen; // immutable
+    private final int queueId; //, immutable
+    private final String bname; //important, immutable
+    private long logicOffset; // the start of the logic offset, important, can be changed by command only once
+    private final long startOffset; // the start of the physical offset, should always be 0, immutable
+    private long endOffset = -1; // the end of the physical offset, excluded, revered -1, mutable
+    private long timeOfStart = -1; // mutable, reserved
+    private long timeOfEnd = -1; // mutable, reserved
 
     public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
         this.gen = gen;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index c941fff..7117cad 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -45,14 +45,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
 
     public void buildIdMap() {
         this.currIdMap = buildIdMap(LEVEL_0);
-        this.prevIdMap = buildIdMap(LEVEL_1);
     }
 
 
     public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
         //level 0 means current leader in this broker
-        //level 1 means previous leader in this broker
-        assert level == LEVEL_0 || level == LEVEL_1;
+        //level 1 means previous leader in this broker, reserved for
+        assert level == LEVEL_0 ;
 
         if (hostedQueues == null || hostedQueues.isEmpty()) {
             return new ConcurrentHashMap<Integer, Integer>();
@@ -67,12 +66,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
                 if (bname.equals(curr.getBname())) {
                     tmpIdMap.put(globalId, curr.getQueueId());
                 }
-            } else if (level == LEVEL_1
-                    && items.size() >= 2) {
-                LogicQueueMappingItem prev = items.get(items.size() - 1);
-                if (bname.equals(prev.getBname())) {
-                    tmpIdMap.put(globalId, prev.getQueueId());
-                }
             }
         }
         return tmpIdMap;
@@ -120,8 +113,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     public TopicQueueMappingInfo cloneAsMappingInfo() {
         TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
         topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
-        topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
-
         return topicQueueMappingInfo;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index b6ce222..f6122c0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap;
 
 public class TopicQueueMappingInfo extends RemotingSerializable {
     public static final int LEVEL_0 = 0;
-    public static final int LEVEL_1 = 1;
 
     String topic; // redundant field
     int totalQueues;
@@ -32,8 +31,6 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     boolean dirty; //indicate if the data is dirty
     //register to broker to construct the route
     transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
-    //register to broker to help detect remapping failure
-    transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
 
     public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
         this.topic = topic;
@@ -79,8 +76,4 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     public ConcurrentMap<Integer, Integer> getCurrIdMap() {
         return currIdMap;
     }
-
-    public ConcurrentMap<Integer, Integer> getPrevIdMap() {
-        return prevIdMap;
-    }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index b210857..60a0f81 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -168,6 +168,7 @@ public class RouteInfoManager {
                             if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                                 topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
                             }
+                            //Note asset brokerName equal entry.getValue().getBname()
                             topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                         }
                     }