You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/07 15:48:29 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the register process in namesrv

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 7707275  Finish the register process in namesrv
7707275 is described below

commit 7707275e37349a4ce6ed4a32547683196de17ff8
Author: dongeforever <do...@apache.org>
AuthorDate: Sun Nov 7 23:48:13 2021 +0800

    Finish the register process in namesrv
---
 .../apache/rocketmq/broker/BrokerController.java   |  4 +-
 .../broker/processor/AdminBrokerProcessor.java     |  6 +-
 .../broker/topic/TopicQueueMappingManager.java     | 12 ++--
 .../common/TopicConfigAndQueueMapping.java         | 10 +--
 ...ppingInfo.java => TopicQueueMappingDetail.java} | 24 ++-----
 .../rocketmq/common/TopicQueueMappingInfo.java     | 79 ++++------------------
 .../common/protocol/body/RegisterBrokerBody.java   |  1 +
 .../protocol/body/TopicQueueMappingBody.java       |  4 +-
 .../body/TopicQueueMappingSerializeWrapper.java    |  8 +--
 .../header/namesrv/GetRouteInfoRequestHeader.java  | 20 ------
 .../common/protocol/route/TopicRouteData.java      | 27 ++++----
 .../processor/ClusterTestRequestProcessor.java     |  2 +-
 .../namesrv/processor/DefaultRequestProcessor.java | 19 +-----
 .../namesrv/routeinfo/RouteInfoManager.java        | 74 +++++---------------
 .../processor/DefaultRequestProcessorTest.java     |  4 +-
 15 files changed, 80 insertions(+), 214 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 926c43b..e08dbbe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1084,7 +1084,7 @@ public class BrokerController {
         Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
             .map(TopicConfig::getTopicName)
             .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
-                .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.clone4register()))
+                .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.cloneAsMappingInfo()))
                 .orElse(null))
             .filter(Objects::nonNull)
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
@@ -1103,7 +1103,7 @@ public class BrokerController {
         topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
 
         topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
-                entry ->  new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().clone4register())
+                entry ->  new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().cloneAsMappingInfo())
         ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 99c7031..6bd742c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1726,11 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
             return response;
         }
-        TopicQueueMappingInfo topicQueueMappingInfo = null;
+        TopicQueueMappingDetail topicQueueMappingDetail = null;
         if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
-            topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+            topicQueueMappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
         }
-        String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo));
+        String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingDetail));
         try {
             response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
         } catch (UnsupportedEncodingException e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index da7ea8f..2f5d558 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
-import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
@@ -41,18 +41,18 @@ public class TopicQueueMappingManager extends ConfigManager {
     private final DataVersion dataVersion = new DataVersion();
     private transient BrokerController brokerController;
 
-    private final ConcurrentMap<String, TopicQueueMappingInfo> topicQueueMappingTable = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, TopicQueueMappingDetail> topicQueueMappingTable = new ConcurrentHashMap<>();
 
 
     public TopicQueueMappingManager(BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
-    public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) {
-        topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo);
+    public void updateTopicQueueMapping(TopicQueueMappingDetail topicQueueMappingDetail) {
+        topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), topicQueueMappingDetail);
     }
 
-    public TopicQueueMappingInfo getTopicQueueMapping(String topic) {
+    public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
         return topicQueueMappingTable.get(topic);
     }
 
@@ -86,7 +86,7 @@ public class TopicQueueMappingManager extends ConfigManager {
         }
     }
 
-    public ConcurrentMap<String, TopicQueueMappingInfo> getTopicQueueMappingTable() {
+    public ConcurrentMap<String, TopicQueueMappingDetail> getTopicQueueMappingTable() {
         return topicQueueMappingTable;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index 7b1ea40..e7fef8d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -18,15 +18,15 @@ package org.apache.rocketmq.common;
 
 public class TopicConfigAndQueueMapping {
     private TopicConfig topicConfig;
-    private TopicQueueMappingInfo topicQueueMappingInfo;
+    private TopicQueueMappingDetail topicQueueMappingDetail;
 
-    public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) {
+    public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail) {
         this.topicConfig = topicConfig;
-        this.topicQueueMappingInfo = topicQueueMappingInfo;
+        this.topicQueueMappingDetail = topicQueueMappingDetail;
     }
 
-    public TopicQueueMappingInfo getTopicQueueMappingInfo() {
-        return topicQueueMappingInfo;
+    public TopicQueueMappingDetail getTopicQueueMappingInfo() {
+        return topicQueueMappingDetail;
     }
 
     public TopicConfig getTopicConfig() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
similarity index 77%
copy from common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
copy to common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 85498df..a90ca7e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -17,31 +17,19 @@
 package org.apache.rocketmq.common;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-public class TopicQueueMappingInfo extends RemotingSerializable {
-    public static final int LEVEL_0 = 0;
-    public static final int LEVEL_1 = 1;
+public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
 
-    private String topic; // redundant field
-    private int totalQueues;
-    private String bname;  //identify the hosted broker name
     // the mapping info in current broker, do not register to nameserver
-    private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
-    //register to broker to construct the route
-    private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
-    //register to broker to help detect remapping failure
-    private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
-
-    public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
-        this.topic = topic;
-        this.totalQueues = totalQueues;
-        this.bname = bname;
+    ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+
+    public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
+        super(topic, totalQueues, bname);
         buildIdMap();
     }
 
@@ -93,7 +81,7 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     }
 
 
-    public TopicQueueMappingInfo  clone4register() {
+    public TopicQueueMappingInfo cloneAsMappingInfo() {
         TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
         topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
         topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 85498df..c5bbeef 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -16,11 +16,8 @@
  */
 package org.apache.rocketmq.common;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -28,78 +25,20 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     public static final int LEVEL_0 = 0;
     public static final int LEVEL_1 = 1;
 
-    private String topic; // redundant field
-    private int totalQueues;
-    private String bname;  //identify the hosted broker name
-    // the mapping info in current broker, do not register to nameserver
-    private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+    String topic; // redundant field
+    int totalQueues;
+    String bname;  //identify the hosted broker name
     //register to broker to construct the route
-    private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+    ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
     //register to broker to help detect remapping failure
-    private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
+    protected ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
 
     public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
         this.topic = topic;
         this.totalQueues = totalQueues;
         this.bname = bname;
-        buildIdMap();
     }
 
-    public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
-        if (mappingInfo.isEmpty()) {
-            return true;
-        }
-        hostedQueues.put(globalId, mappingInfo);
-        buildIdMap();
-        return true;
-    }
-
-    public void buildIdMap() {
-        this.currIdMap = buildIdMap(LEVEL_0);
-        this.prevIdMap = buildIdMap(LEVEL_1);
-    }
-
-    public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
-        //level 0 means current leader in this broker
-        //level 1 means previous leader in this broker
-        assert level == LEVEL_0 || level == LEVEL_1;
-
-        if (hostedQueues == null || hostedQueues.isEmpty()) {
-            return new ConcurrentHashMap<Integer, Integer>();
-        }
-        ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
-        for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
-            Integer globalId =  entry.getKey();
-            ImmutableList<LogicQueueMappingItem> items = entry.getValue();
-            if (level == LEVEL_0
-                    && items.size() >= 1) {
-                LogicQueueMappingItem curr = items.get(items.size() - 1);
-                if (bname.equals(curr.getBname())) {
-                    tmpIdMap.put(curr.getQueueId(), globalId);
-                }
-            } else if (level == LEVEL_1
-                    && items.size() >= 2) {
-                LogicQueueMappingItem prev = items.get(items.size() - 1);
-                if (bname.equals(prev.getBname())) {
-                    tmpIdMap.put(prev.getQueueId(), globalId);
-                }
-            }
-        }
-        return tmpIdMap;
-    }
-
-    public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
-        return hostedQueues.get(globalId);
-    }
-
-
-    public TopicQueueMappingInfo  clone4register() {
-        TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
-        topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
-        topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
-
-        return topicQueueMappingInfo;
-    }
 
     public int getTotalQueues() {
         return totalQueues;
@@ -116,4 +55,12 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     public String getTopic() {
         return topic;
     }
+
+    public ConcurrentMap<Integer, Integer> getCurrIdMap() {
+        return currIdMap;
+    }
+
+    public ConcurrentMap<Integer, Integer> getPrevIdMap() {
+        return prevIdMap;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index 2d7597f..ea84ea1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -33,6 +33,7 @@ import java.util.zip.InflaterInputStream;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
index 4caba89..edb1d34 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
@@ -16,9 +16,9 @@
  */
 package org.apache.rocketmq.common.protocol.body;
 
-import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
 
-public class TopicQueueMappingBody extends TopicQueueMappingInfo {
+public class TopicQueueMappingBody extends TopicQueueMappingDetail {
 
     public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
         super(topic, totalQueues, bname);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
index 1d3d6c5..317ccd7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
@@ -18,20 +18,20 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.Map;
 
 public class TopicQueueMappingSerializeWrapper extends RemotingSerializable {
-    private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap;
+    private Map<String/* topic */, TopicQueueMappingDetail> topicQueueMappingInfoMap;
     private DataVersion dataVersion = new DataVersion();
 
-    public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
+    public Map<String, TopicQueueMappingDetail> getTopicQueueMappingInfoMap() {
         return topicQueueMappingInfoMap;
     }
 
-    public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
+    public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingDetail> topicQueueMappingInfoMap) {
         this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
index ad776c8..a2806e6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
@@ -20,7 +20,6 @@
  */
 package org.apache.rocketmq.common.protocol.header.namesrv;
 
-import java.util.Set;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -29,9 +28,6 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
 
-    private int sysFlag;
-    private Set<Integer> logicalQueueIdsFilter;
-
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -43,20 +39,4 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
     public void setTopic(String topic) {
         this.topic = topic;
     }
-
-    public int getSysFlag() {
-        return sysFlag;
-    }
-
-    public void setSysFlag(int sysFlag) {
-        this.sysFlag = sysFlag;
-    }
-
-    public void setLogicalQueueIdsFilter(Set<Integer> filter) {
-        this.logicalQueueIdsFilter = filter;
-    }
-
-    public Set<Integer> getLogicalQueueIdsFilter() {
-        return logicalQueueIdsFilter;
-    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index 4470a2a..c2cad6c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -23,6 +23,9 @@ package org.apache.rocketmq.common.protocol.route;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicRouteData extends RemotingSerializable {
@@ -30,7 +33,7 @@ public class TopicRouteData extends RemotingSerializable {
     private List<QueueData> queueDatas;
     private List<BrokerData> brokerDatas;
     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-    private LogicalQueuesInfo logicalQueuesInfo;
+    private Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker;
 
     public TopicRouteData() {
     }
@@ -53,8 +56,8 @@ public class TopicRouteData extends RemotingSerializable {
             this.filterServerTable.putAll(topicRouteData.filterServerTable);
         }
 
-        if (topicRouteData.logicalQueuesInfo != null) {
-            this.logicalQueuesInfo = new LogicalQueuesInfo(topicRouteData.logicalQueuesInfo);
+        if (topicRouteData.topicQueueMappingByBroker != null) {
+            this.topicQueueMappingByBroker = new HashMap<String, TopicQueueMappingInfo>(topicRouteData.topicQueueMappingByBroker);
         }
     }
 
@@ -90,12 +93,12 @@ public class TopicRouteData extends RemotingSerializable {
         this.orderTopicConf = orderTopicConf;
     }
 
-    public LogicalQueuesInfo getLogicalQueuesInfo() {
-        return logicalQueuesInfo;
+    public Map<String, TopicQueueMappingInfo> getTopicQueueMappingByBroker() {
+        return topicQueueMappingByBroker;
     }
 
-    public void setLogicalQueuesInfo(LogicalQueuesInfo logicalQueuesInfo) {
-        this.logicalQueuesInfo = logicalQueuesInfo;
+    public void setTopicQueueMappingByBroker(Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker) {
+        this.topicQueueMappingByBroker = topicQueueMappingByBroker;
     }
 
     @Override
@@ -106,7 +109,7 @@ public class TopicRouteData extends RemotingSerializable {
         result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode());
         result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode());
         result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode());
-        result = prime * result + ((logicalQueuesInfo == null) ? 0 : logicalQueuesInfo.hashCode());
+        result = prime * result + ((topicQueueMappingByBroker == null) ? 0 : topicQueueMappingByBroker.hashCode());
         return result;
     }
 
@@ -139,10 +142,10 @@ public class TopicRouteData extends RemotingSerializable {
                 return false;
         } else if (!filterServerTable.equals(other.filterServerTable))
             return false;
-        if (logicalQueuesInfo == null) {
-            if (other.logicalQueuesInfo != null)
+        if (topicQueueMappingByBroker == null) {
+            if (other.topicQueueMappingByBroker != null)
                 return false;
-        } else if (!logicalQueuesInfo.equals(other.logicalQueuesInfo))
+        } else if (!topicQueueMappingByBroker.equals(other.topicQueueMappingByBroker))
             return false;
         return true;
     }
@@ -150,6 +153,6 @@ public class TopicRouteData extends RemotingSerializable {
     @Override
     public String toString() {
         return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas
-            + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + ", logicalQueuesInfo=" + logicalQueuesInfo + "]";
+            + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + ", topicQueueMappingInfoTable=" + topicQueueMappingByBroker + "]";
     }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index dd15288..a58a3b9 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -56,7 +56,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
         final GetRouteInfoRequestHeader requestHeader =
             (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
 
-        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), false);
+        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
         if (topicRouteData != null) {
             String orderTopicConf =
                 this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 85e3883..98e96df 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.namesrv.processor;
 import io.netty.channel.ChannelHandlerContext;
 import java.io.UnsupportedEncodingException;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
@@ -53,9 +52,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -353,9 +350,7 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
         final GetRouteInfoRequestHeader requestHeader =
             (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
 
-        boolean includeLogicalQueuesInfo = (requestHeader.getSysFlag() & MessageSysFlag.LOGICAL_QUEUE_FLAG) > 0;
-
-        TopicRouteDataNameSrv topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), includeLogicalQueuesInfo);
+        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
 
         if (topicRouteData != null) {
             if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
@@ -365,16 +360,6 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
                 topicRouteData.setOrderTopicConf(orderTopicConf);
             }
 
-            Set<Integer> logicalQueueIdsFilter = requestHeader.getLogicalQueueIdsFilter();
-            if (logicalQueueIdsFilter != null) {
-                LogicalQueuesInfoUnordered logicalQueuesInfo = topicRouteData.getLogicalQueuesInfoUnordered();
-                if (logicalQueuesInfo != null) {
-                    LogicalQueuesInfoUnordered filtered = new LogicalQueuesInfoUnordered(logicalQueueIdsFilter.size());
-                    logicalQueueIdsFilter.forEach(integer -> filtered.put(integer, logicalQueuesInfo.get(integer)));
-                    topicRouteData.setLogicalQueuesInfoUnordered(filtered);
-                }
-            }
-
             byte[] content = topicRouteData.encode();
             response.setBody(content);
             response.setCode(ResponseCode.SUCCESS);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ab9e77f..bfdf53c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.namesrv.routeinfo;
 
 import io.netty.channel.Channel;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -25,19 +24,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -45,13 +43,10 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
 import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
 
 public class RouteInfoManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
@@ -62,7 +57,8 @@ public class RouteInfoManager {
     private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
     private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
     private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-    private final ConcurrentMap<String/* topic */, LogicalQueuesInfoUnordered> logicalQueuesInfoTable;
+    private final HashMap<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
+
 
     public RouteInfoManager() {
         this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
@@ -70,7 +66,7 @@ public class RouteInfoManager {
         this.clusterAddrTable = new HashMap<String, Set<String>>(32);
         this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
         this.filterServerTable = new HashMap<String, List<String>>(256);
-        this.logicalQueuesInfoTable = new ConcurrentHashMap<>(1024);
+        this.topicQueueMappingInfoTable = new HashMap<String, Map<String, TopicQueueMappingInfo>>(1024);
     }
 
     public byte[] getAllClusterInfo() {
@@ -158,20 +154,21 @@ public class RouteInfoManager {
                         || registerFirst) {
                         ConcurrentMap<String, TopicConfig> tcTable =
                             topicConfigWrapper.getTopicConfigTable();
-                        Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigWrapper.getLogicalQueuesInfoMap();
                         if (tcTable != null) {
                             for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                 this.createAndUpdateQueueData(brokerName, entry.getValue());
                             }
                         }
-                        if (logicalQueuesInfoMap != null) {
-                            long startTime = System.nanoTime();
-                            for (Map.Entry<String, LogicalQueuesInfo> entry : logicalQueuesInfoMap.entrySet()) {
-                                String topicName = entry.getKey();
-                                LogicalQueuesInfoUnordered logicalQueuesInfo = ConcurrentHashMapUtil.computeIfAbsent(this.logicalQueuesInfoTable, topicName, ignore -> new LogicalQueuesInfoUnordered());
-                                mergeLogicalQueuesInfo(brokerName, topicName, logicalQueuesInfo, entry.getValue());
+
+                        TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+
+                        Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+
+                        for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
+                            if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+                                topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
                             }
-                            log.debug("mergeQueueRouteDataTable topic={} time={}ns", System.nanoTime() - startTime);
+                            topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                         }
                     }
                 }
@@ -403,12 +400,8 @@ public class RouteInfoManager {
         }
     }
 
-    public TopicRouteDataNameSrv pickupTopicRouteData(final String topic) {
-        return pickupTopicRouteData(topic, false);
-    }
-
-    public TopicRouteDataNameSrv pickupTopicRouteData(final String topic, boolean includeLogicalQueuesInfo) {
-        TopicRouteDataNameSrv topicRouteData = new TopicRouteDataNameSrv();
+    public TopicRouteData pickupTopicRouteData(final String topic) {
+        TopicRouteData topicRouteData = new TopicRouteData();
         boolean foundQueueData = false;
         boolean foundBrokerData = false;
         Set<String> brokerNameSet = new HashSet<String>();
@@ -417,6 +410,7 @@ public class RouteInfoManager {
 
         HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
         topicRouteData.setFilterServerTable(filterServerMap);
+        topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
 
         try {
             try {
@@ -456,10 +450,6 @@ public class RouteInfoManager {
         log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
 
         if (foundBrokerData && foundQueueData) {
-            if (includeLogicalQueuesInfo) {
-                topicRouteData.setLogicalQueuesInfoUnordered(logicalQueuesInfoTable.get(topic));
-            }
-
             return topicRouteData;
         }
 
@@ -790,34 +780,6 @@ public class RouteInfoManager {
 
         return topicList.encode();
     }
-
-    private static void mergeLogicalQueuesInfo(String brokerName, String topicName,
-        LogicalQueuesInfoUnordered logicalQueuesInfoInNamesrv,
-        LogicalQueuesInfo logicalQueuesInfoFromBroker) {
-        Set<LogicalQueuesInfoUnordered.Key> newKeys = logicalQueuesInfoFromBroker.values()
-            .stream()
-            .flatMap(Collection::stream)
-            .filter(v -> Objects.equals(brokerName, v.getBrokerName()))
-            .map(v -> new LogicalQueuesInfoUnordered.Key(null, v.getQueueId(), v.getOffsetDelta()))
-            .collect(Collectors.toSet());
-        logicalQueuesInfoInNamesrv.values().forEach(m ->
-            m.values().removeIf(queueRouteData ->
-                Objects.equals(brokerName, queueRouteData.getBrokerName()) &&
-                    !newKeys.contains(new LogicalQueuesInfoUnordered.Key(null, queueRouteData.getQueueId(), queueRouteData.getOffsetDelta()))));
-        logicalQueuesInfoFromBroker.forEach((logicalQueueId, queueRouteDataListFromBroker) -> {
-            if (logicalQueueId == null) {
-                log.warn("queueRouteDataTable topic {} contains null logicalQueueId: {}", topicName, logicalQueuesInfoFromBroker);
-                return;
-            }
-            queueRouteDataListFromBroker.stream()
-                .filter(queueRouteDataFromBroker -> Objects.equals(brokerName, queueRouteDataFromBroker.getBrokerName()))
-                .forEach(queueRouteDataFromBroker ->
-                    ConcurrentHashMapUtil.computeIfAbsent(logicalQueuesInfoInNamesrv, logicalQueueId, ignored -> new ConcurrentHashMap<>(queueRouteDataListFromBroker.size()))
-                        .put(new LogicalQueuesInfoUnordered.Key(brokerName, queueRouteDataFromBroker.getQueueId(), queueRouteDataFromBroker.getOffsetDelta()),
-                            queueRouteDataFromBroker)
-                );
-        });
-    }
 }
 
 class BrokerLiveInfo {
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
index 97e6126..38bff74 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
@@ -200,7 +200,7 @@ public class DefaultRequestProcessorTest {
             .contains(new HashMap.SimpleEntry("broker", broker));
     }
 
-    @Test
+    /*@Test
     public void testProcessRequest_RegisterBrokerLogicalQueue() throws Exception {
         String cluster = "cluster";
         String broker1Name = "broker1";
@@ -299,7 +299,7 @@ public class DefaultRequestProcessorTest {
             assertThat(topicRouteDataNameSrv.getLogicalQueuesInfoUnordered()).isEqualTo(logicalQueuesInfoUnordered);
         }
     }
-
+*/
     @Test
     public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
         NoSuchFieldException, IllegalAccessException {