You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/05/05 07:37:11 UTC

[rocketmq] branch 5.0.0-beta updated: [ISSUE #4245] Remove the topic route cache in nameserver

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new d937c1d61 [ISSUE #4245] Remove the topic route cache in nameserver
d937c1d61 is described below

commit d937c1d6158ba36a3836cd39dbbd5c96f4d9d797
Author: rongtong <ji...@163.com>
AuthorDate: Thu May 5 15:36:48 2022 +0800

    [ISSUE #4245] Remove the topic route cache in nameserver
    
    [ISSUE #4245] Remove the topic route cache in nameserver
---
 .../rocketmq/common/namesrv/NamesrvConfig.java     |  33 ++--
 .../rocketmq/common/protocol/route/BrokerData.java |   8 +
 .../apache/rocketmq/namesrv/NamesrvController.java |   1 -
 .../namesrv/processor/ClientRequestProcessor.java  |   8 +-
 .../namesrv/processor/DefaultRequestProcessor.java |  68 +++++---
 .../namesrv/routeinfo/RouteInfoManager.java        | 184 ++++++++-------------
 6 files changed, 151 insertions(+), 151 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 002c9569f..698a0582b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -22,12 +22,8 @@ package org.apache.rocketmq.common.namesrv;
 
 import java.io.File;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 public class NamesrvConfig {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
     private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
     private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
@@ -36,7 +32,6 @@ public class NamesrvConfig {
     private boolean clusterTest = false;
     private boolean orderMessageEnable = false;
     private boolean returnOrderTopicConfigToBroker = true;
-    private volatile boolean remoteFaultTolerance = true;
 
     /**
      * Indicates the nums of thread to handle client requests, like GET_ROUTEINTO_BY_TOPIC.
@@ -71,15 +66,13 @@ public class NamesrvConfig {
      */
     private boolean supportActingMaster = false;
 
-    private volatile boolean enableAllTopicList = false;
+    private volatile boolean enableAllTopicList = true;
 
-    public void setRemoteFaultTolerance(boolean remoteFaultTolerance) {
-        this.remoteFaultTolerance = remoteFaultTolerance;
-    }
 
-    public boolean isRemoteFaultTolerance() {
-        return remoteFaultTolerance;
-    }
+    private volatile boolean enableTopicList = true;
+
+    private volatile boolean notifyMinBrokerIdChanged = true;
+
 
     public boolean isOrderMessageEnable() {
         return orderMessageEnable;
@@ -200,4 +193,20 @@ public class NamesrvConfig {
     public void setEnableAllTopicList(boolean enableAllTopicList) {
         this.enableAllTopicList = enableAllTopicList;
     }
+
+    public boolean isEnableTopicList() {
+        return enableTopicList;
+    }
+
+    public void setEnableTopicList(boolean enableTopicList) {
+        this.enableTopicList = enableTopicList;
+    }
+
+    public boolean isNotifyMinBrokerIdChanged() {
+        return notifyMinBrokerIdChanged;
+    }
+
+    public void setNotifyMinBrokerIdChanged(boolean notifyMinBrokerIdChanged) {
+        this.notifyMinBrokerIdChanged = notifyMinBrokerIdChanged;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index 73d725c69..9a67ca5dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -58,6 +58,14 @@ public class BrokerData implements Comparable<BrokerData> {
         this.brokerAddrs = brokerAddrs;
     }
 
+    public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs,
+        boolean enableActingMaster) {
+        this.cluster = cluster;
+        this.brokerName = brokerName;
+        this.brokerAddrs = brokerAddrs;
+        this.enableActingMaster = enableActingMaster;
+    }
+
     /**
      * Selects a (preferably master) broker address from the registered list.
      * If the master's address cannot be found, a slave broker address is selected in a random manner.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index d02e6c0ad..784d6eb0f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -52,7 +52,6 @@ import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.srvutil.FileWatchService;
 
-
 public class NamesrvController {
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
     private static final InternalLogger WATER_MARK_LOG = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_WATER_MARK_LOGGER_NAME);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
index 6f624e31b..85b36eb4b 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
@@ -56,9 +56,6 @@ public class ClientRequestProcessor implements NettyRequestProcessor {
                     String orderTopicConf =
                         this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                             requestHeader.getTopic());
-
-                    // Get a shallow clone of route data to modify order topic conf
-                    topicRouteData = topicRouteData.cloneTopicRouteData();
                     topicRouteData.setOrderTopicConf(orderTopicConf);
                 }
 
@@ -87,8 +84,6 @@ public class ClientRequestProcessor implements NettyRequestProcessor {
             if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                 String orderTopicConf =
                     this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic);
-                // Get a shallow clone of route data to modify order topic conf
-                topicRouteData = topicRouteData.cloneTopicRouteData();
                 topicRouteData.setOrderTopicConf(orderTopicConf);
             }
 
@@ -100,7 +95,8 @@ public class ClientRequestProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    @Override public boolean rejectRequest() {
+    @Override
+    public boolean rejectRequest() {
         return false;
     }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 3a0bc625d..5c9a9340d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -499,12 +499,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         final GetTopicsByClusterRequestHeader requestHeader =
             (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
 
-        TopicList topicsByCluster = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
-        byte[] body = topicsByCluster.encode();
+        boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+        if (enableTopicList) {
+            TopicList topicsByCluster = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
+            byte[] body = topicsByCluster.encode();
+
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("disable");
+        }
 
-        response.setBody(body);
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
         return response;
     }
 
@@ -525,12 +532,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
-        TopicList unitTopics = this.namesrvController.getRouteInfoManager().getUnitTopics();
-        byte[] body = unitTopics.encode();
+        boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+
+        if (enableTopicList) {
+            TopicList unitTopicList = this.namesrvController.getRouteInfoManager().getUnitTopics();
+            byte[] body = unitTopicList.encode();
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("disable");
+        }
 
-        response.setBody(body);
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
         return response;
     }
 
@@ -538,12 +552,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
-        TopicList hasUnitSubTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();
-        byte[] body = hasUnitSubTopicList.encode();
+        boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+
+        if (enableTopicList) {
+            TopicList hasUnitSubTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();
+            byte[] body = hasUnitSubTopicList.encode();
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("disable");
+        }
 
-        response.setBody(body);
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
         return response;
     }
 
@@ -551,12 +572,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
-        TopicList hasUnitSubUnUnitTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();
-        byte[] body = hasUnitSubUnUnitTopicList.encode();
+        boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+
+        if (enableTopicList) {
+            TopicList hasUnitSubUnUnitTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();
+            byte[] body = hasUnitSubUnUnitTopicList.encode();
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("disable");
+        }
 
-        response.setBody(body);
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
         return response;
     }
 
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index f64640a26..aac1259ea 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -73,9 +74,6 @@ public class RouteInfoManager {
     private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
     private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
 
-    // Contains topic route data needed by clients
-    private final Map<String /* topic */, TopicRouteData> topicRouteDataMap;
-
     private final BatchUnRegisterService unRegisterService;
 
     private final NamesrvController namesrvController;
@@ -88,7 +86,6 @@ public class RouteInfoManager {
         this.brokerLiveTable = new ConcurrentHashMap<BrokerAddrInfo, BrokerLiveInfo>(256);
         this.filterServerTable = new ConcurrentHashMap<BrokerAddrInfo, List<String>>(256);
         this.topicQueueMappingInfoTable = new ConcurrentHashMap<String, Map<String, TopicQueueMappingInfo>>(1024);
-        this.topicRouteDataMap = new ConcurrentHashMap<>(1024);
         this.unRegisterService = new BatchUnRegisterService(this, namesrvConfig);
         this.namesrvConfig = namesrvConfig;
         this.namesrvController = namesrvController;
@@ -140,8 +137,6 @@ public class RouteInfoManager {
 
                     this.topicQueueTable.put(topic, queueDataMap);
                     log.info("Register topic route:{}, {}", topic, queueDatas);
-
-                    updateTopicRouteData(Sets.newHashSet(topic));
                 }
             } finally {
                 this.lock.writeLock().unlock();
@@ -156,7 +151,6 @@ public class RouteInfoManager {
             try {
                 this.lock.writeLock().lockInterruptibly();
                 this.topicQueueTable.remove(topic);
-                this.updateTopicRouteData(Sets.newHashSet(topic));
             } finally {
                 this.lock.writeLock().unlock();
             }
@@ -186,7 +180,6 @@ public class RouteInfoManager {
                             break;
                         }
                     }
-                    this.updateTopicRouteData(Sets.newHashSet(topic));
                 }
             } finally {
                 this.lock.writeLock().unlock();
@@ -294,8 +287,6 @@ public class RouteInfoManager {
                 String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
                 registerFirst = registerFirst || (null == oldAddr);
 
-                Set<String> configChangedTopics = new HashSet<>();
-
                 boolean isMaster = MixAll.MASTER_ID == brokerId;
                 boolean isPrimeSlave = !isOldVersionBroker && !isMaster
                     && brokerId == Collections.min(brokerAddrsMap.keySet());
@@ -306,17 +297,14 @@ public class RouteInfoManager {
                         topicConfigWrapper.getTopicConfigTable();
                     if (tcTable != null) {
                         for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
-                            final TopicConfig topicConfig = entry.getValue();
-                            String topicName = topicConfig.getTopicName();
                             if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
-                                topicConfigWrapper.getDataVersion(), brokerName, topicName)) {
-                                configChangedTopics.add(topicName);
-
+                                topicConfigWrapper.getDataVersion(), brokerName,
+                                entry.getValue().getTopicName())) {
+                                final TopicConfig topicConfig = entry.getValue();
                                 if (isPrimeSlave) {
-                                    // Wipe the write perm for prime slave
+                                    // Wipe write perm for prime slave
                                     topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                                 }
-
                                 this.createAndUpdateQueueData(brokerName, topicConfig);
                             }
                         }
@@ -369,14 +357,7 @@ public class RouteInfoManager {
                     }
                 }
 
-                if (registerFirst && MixAll.MASTER_ID != brokerId) {
-                    configChangedTopics = this.topicSetOfBrokerName(brokerName);
-                }
-
-                // Update topicRouteDataMap
-                updateTopicRouteData(configChangedTopics);
-
-                if (isMinBrokerIdChanged && brokerData.isEnableActingMaster()) {
+                if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
                     notifyMinBrokerIdChanged(brokerAddrsMap, null,
                         this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
                 }
@@ -390,54 +371,6 @@ public class RouteInfoManager {
         return result;
     }
 
-    private void updateTopicRouteData(final Set<String> changedTopics) {
-        for (final String changedTopic : changedTopics) {
-            if (!this.topicQueueTable.containsKey(changedTopic)) {
-                // This topic doesn't have any queue data route info
-                // Just remove it from topicRouteDataMap
-                this.topicRouteDataMap.remove(changedTopic);
-                continue;
-            }
-
-            TopicRouteData topicRouteData = new TopicRouteData();
-            boolean foundBrokerData = false;
-            List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
-            HashMap<String, List<String>> filterServerMap = topicRouteData.getFilterServerTable();
-
-            Map<String, QueueData> queueDataMap = this.topicQueueTable.get(changedTopic);
-            if (queueDataMap != null) {
-                // Deep copy the queue data list
-                List<QueueData> queueDataList = topicRouteData.getQueueDatas();
-                for (final QueueData queueData : queueDataMap.values()) {
-                    queueDataList.add(new QueueData(queueData));
-                }
-
-                Set<String> brokerNameSet = new HashSet<String>(queueDataMap.keySet());
-
-                for (String brokerName : brokerNameSet) {
-                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
-                    if (null != brokerData) {
-                        BrokerData brokerDataClone = new BrokerData(brokerData);
-                        brokerDataList.add(brokerDataClone);
-                        foundBrokerData = true;
-
-                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
-                            BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
-                            List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
-                            if (filterServerList != null) {
-                                filterServerMap.put(brokerAddr, new ArrayList<String>(filterServerList));
-                            }
-                        }
-                    }
-                }
-            }
-
-            if (foundBrokerData) {
-                this.topicRouteDataMap.put(changedTopic, topicRouteData);
-            }
-        }
-    }
-
     public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName) {
         BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, brokerName);
         try {
@@ -564,7 +497,6 @@ public class RouteInfoManager {
 
             final QueueData qd = qdMap.get(brokerName);
             if (qd != null) {
-                changedTopics.add(entry.getKey());
                 int perm = qd.getPerm();
                 switch (requestCode) {
                     case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
@@ -578,7 +510,6 @@ public class RouteInfoManager {
                 topicCnt++;
             }
         }
-        this.updateTopicRouteData(changedTopics);
         return topicCnt;
     }
 
@@ -665,10 +596,9 @@ public class RouteInfoManager {
                     }
                 }
 
-                Set<String> changedTopics = cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
-                this.updateTopicRouteData(changedTopics);
+                cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
 
-                if (!needNotifyBrokerMap.isEmpty()) {
+                if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
                     notifyMinBrokerIdChanged(needNotifyBrokerMap);
                 }
             } finally {
@@ -679,8 +609,7 @@ public class RouteInfoManager {
         }
     }
 
-    private Set<String> cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
-        Set<String> changedTopics = new HashSet<>();
+    private void cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
         Iterator<Entry<String, Map<String, QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
         while (itMap.hasNext()) {
             Entry<String, Map<String, QueueData>> entry = itMap.next();
@@ -691,13 +620,12 @@ public class RouteInfoManager {
             for (final String brokerName : removedBroker) {
                 final QueueData removedQD = queueDataMap.remove(brokerName);
                 if (removedQD != null) {
-                    changedTopics.add(topic);
-                    log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
+                    log.debug("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
                 }
             }
 
             if (queueDataMap.isEmpty()) {
-                log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
+                log.debug("removeTopicByBrokerName, remove the topic all queue {}", topic);
                 itMap.remove();
             }
 
@@ -705,7 +633,6 @@ public class RouteInfoManager {
                 final QueueData queueData = queueDataMap.get(brokerName);
 
                 if (queueData != null) {
-                    changedTopics.add(topic);
                     if (this.brokerAddrTable.get(brokerName).isEnableActingMaster()) {
                         // Master has been unregistered, wipe the write perm
                         if (isNoMasterExists(brokerName)) {
@@ -715,8 +642,6 @@ public class RouteInfoManager {
                 }
             }
         }
-
-        return changedTopics;
     }
 
     private boolean isNoMasterExists(String brokerName) {
@@ -732,40 +657,76 @@ public class RouteInfoManager {
         return Collections.min(brokerData.getBrokerAddrs().keySet()) > 0;
     }
 
-    private Set<String> topicSetOfBrokerName(final String brokerName) {
-        Set<String> topicOfBroker = new HashSet<>();
-        for (final Entry<String, Map<String, QueueData>> entry : this.topicQueueTable.entrySet()) {
-            if (entry.getValue().containsKey(brokerName)) {
-                topicOfBroker.add(entry.getKey());
+    public TopicRouteData pickupTopicRouteData(final String topic) {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        boolean foundQueueData = false;
+        boolean foundBrokerData = false;
+        Set<String> brokerNameSet = new HashSet<String>();
+        List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
+        topicRouteData.setFilterServerTable(filterServerMap);
+
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
+                if (queueDataMap != null) {
+                    topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
+                    foundQueueData = true;
+
+                    brokerNameSet.addAll(queueDataMap.keySet());
+
+                    for (String brokerName : brokerNameSet) {
+                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+                        if (null != brokerData) {
+                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
+                                .getBrokerAddrs().clone(), brokerData.isEnableActingMaster());
+                            brokerDataList.add(brokerDataClone);
+                            foundBrokerData = true;
+                            if (!filterServerTable.isEmpty()) {
+                                for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
+                                    BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
+                                    List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
+                                    filterServerMap.put(brokerAddr, filterServerList);
+                                }
+                            }
+                        }
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
             }
+        } catch (Exception e) {
+            log.error("pickupTopicRouteData Exception", e);
         }
-        return topicOfBroker;
-    }
 
-    public TopicRouteData pickupTopicRouteData(final String topic) {
-        if (topic != null) {
-            final TopicRouteData routeData = this.topicRouteDataMap.get(topic);
-            if (routeData == null) {
+        log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
+
+        if (foundBrokerData && foundQueueData) {
+
+            if (topicRouteData == null) {
                 return null;
             }
-            routeData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
-            log.debug("pickupTopicRouteData {} {}", topic, routeData);
 
-            if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
-                return routeData;
-            }
+            topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
 
             if (!namesrvConfig.isSupportActingMaster()) {
-                return routeData;
+                return topicRouteData;
+            }
+
+            if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
+                return topicRouteData;
             }
 
-            if (routeData.getBrokerDatas().size() == 0 || routeData.getQueueDatas().size() == 0) {
-                return routeData;
+            if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
+                return topicRouteData;
             }
 
             boolean needActingMaster = false;
 
-            for (final BrokerData brokerData : routeData.getBrokerDatas()) {
+            for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
                 if (brokerData.getBrokerAddrs().size() != 0
                     && !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                     needActingMaster = true;
@@ -774,19 +735,17 @@ public class RouteInfoManager {
             }
 
             if (!needActingMaster) {
-                return routeData;
+                return topicRouteData;
             }
 
-            final TopicRouteData cloneTopicRouteData = routeData.deepCloneTopicRouteData();
-
-            for (final BrokerData brokerData : cloneTopicRouteData.getBrokerDatas()) {
+            for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
                 final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
                 if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
                     continue;
                 }
 
                 // No master
-                for (final QueueData queueData : cloneTopicRouteData.getQueueDatas()) {
+                for (final QueueData queueData : topicRouteData.getQueueDatas()) {
                     if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
                         if (!PermName.isWriteable(queueData.getPerm())) {
                             final Long minBrokerId = Collections.min(brokerAddrs.keySet());
@@ -799,8 +758,9 @@ public class RouteInfoManager {
 
             }
 
-            return cloneTopicRouteData;
+            return topicRouteData;
         }
+
         return null;
     }