You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/05/05 07:37:11 UTC
[rocketmq] branch 5.0.0-beta updated: [ISSUE #4245] Remove the topic route cache in nameserver
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new d937c1d61 [ISSUE #4245] Remove the topic route cache in nameserver
d937c1d61 is described below
commit d937c1d6158ba36a3836cd39dbbd5c96f4d9d797
Author: rongtong <ji...@163.com>
AuthorDate: Thu May 5 15:36:48 2022 +0800
[ISSUE #4245] Remove the topic route cache in nameserver
[ISSUE #4245] Remove the topic route cache in nameserver
---
.../rocketmq/common/namesrv/NamesrvConfig.java | 33 ++--
.../rocketmq/common/protocol/route/BrokerData.java | 8 +
.../apache/rocketmq/namesrv/NamesrvController.java | 1 -
.../namesrv/processor/ClientRequestProcessor.java | 8 +-
.../namesrv/processor/DefaultRequestProcessor.java | 68 +++++---
.../namesrv/routeinfo/RouteInfoManager.java | 184 ++++++++-------------
6 files changed, 151 insertions(+), 151 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 002c9569f..698a0582b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -22,12 +22,8 @@ package org.apache.rocketmq.common.namesrv;
import java.io.File;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
public class NamesrvConfig {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
@@ -36,7 +32,6 @@ public class NamesrvConfig {
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
private boolean returnOrderTopicConfigToBroker = true;
- private volatile boolean remoteFaultTolerance = true;
/**
* Indicates the nums of thread to handle client requests, like GET_ROUTEINTO_BY_TOPIC.
@@ -71,15 +66,13 @@ public class NamesrvConfig {
*/
private boolean supportActingMaster = false;
- private volatile boolean enableAllTopicList = false;
+ private volatile boolean enableAllTopicList = true;
- public void setRemoteFaultTolerance(boolean remoteFaultTolerance) {
- this.remoteFaultTolerance = remoteFaultTolerance;
- }
- public boolean isRemoteFaultTolerance() {
- return remoteFaultTolerance;
- }
+ private volatile boolean enableTopicList = true;
+
+ private volatile boolean notifyMinBrokerIdChanged = true;
+
public boolean isOrderMessageEnable() {
return orderMessageEnable;
@@ -200,4 +193,20 @@ public class NamesrvConfig {
public void setEnableAllTopicList(boolean enableAllTopicList) {
this.enableAllTopicList = enableAllTopicList;
}
+
+ public boolean isEnableTopicList() {
+ return enableTopicList;
+ }
+
+ public void setEnableTopicList(boolean enableTopicList) {
+ this.enableTopicList = enableTopicList;
+ }
+
+ public boolean isNotifyMinBrokerIdChanged() {
+ return notifyMinBrokerIdChanged;
+ }
+
+ public void setNotifyMinBrokerIdChanged(boolean notifyMinBrokerIdChanged) {
+ this.notifyMinBrokerIdChanged = notifyMinBrokerIdChanged;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index 73d725c69..9a67ca5dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -58,6 +58,14 @@ public class BrokerData implements Comparable<BrokerData> {
this.brokerAddrs = brokerAddrs;
}
+ public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs,
+ boolean enableActingMaster) {
+ this.cluster = cluster;
+ this.brokerName = brokerName;
+ this.brokerAddrs = brokerAddrs;
+ this.enableActingMaster = enableActingMaster;
+ }
+
/**
* Selects a (preferably master) broker address from the registered list.
* If the master's address cannot be found, a slave broker address is selected in a random manner.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index d02e6c0ad..784d6eb0f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -52,7 +52,6 @@ import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.srvutil.FileWatchService;
-
public class NamesrvController {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger WATER_MARK_LOG = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_WATER_MARK_LOGGER_NAME);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
index 6f624e31b..85b36eb4b 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
@@ -56,9 +56,6 @@ public class ClientRequestProcessor implements NettyRequestProcessor {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
-
- // Get a shallow clone of route data to modify order topic conf
- topicRouteData = topicRouteData.cloneTopicRouteData();
topicRouteData.setOrderTopicConf(orderTopicConf);
}
@@ -87,8 +84,6 @@ public class ClientRequestProcessor implements NettyRequestProcessor {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic);
- // Get a shallow clone of route data to modify order topic conf
- topicRouteData = topicRouteData.cloneTopicRouteData();
topicRouteData.setOrderTopicConf(orderTopicConf);
}
@@ -100,7 +95,8 @@ public class ClientRequestProcessor implements NettyRequestProcessor {
return response;
}
- @Override public boolean rejectRequest() {
+ @Override
+ public boolean rejectRequest() {
return false;
}
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 3a0bc625d..5c9a9340d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -499,12 +499,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final GetTopicsByClusterRequestHeader requestHeader =
(GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
- TopicList topicsByCluster = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
- byte[] body = topicsByCluster.encode();
+ boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+ if (enableTopicList) {
+ TopicList topicsByCluster = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
+ byte[] body = topicsByCluster.encode();
+
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("disable");
+ }
- response.setBody(body);
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
return response;
}
@@ -525,12 +532,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- TopicList unitTopics = this.namesrvController.getRouteInfoManager().getUnitTopics();
- byte[] body = unitTopics.encode();
+ boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+
+ if (enableTopicList) {
+ TopicList unitTopicList = this.namesrvController.getRouteInfoManager().getUnitTopics();
+ byte[] body = unitTopicList.encode();
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("disable");
+ }
- response.setBody(body);
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
return response;
}
@@ -538,12 +552,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- TopicList hasUnitSubTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();
- byte[] body = hasUnitSubTopicList.encode();
+ boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+
+ if (enableTopicList) {
+ TopicList hasUnitSubTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();
+ byte[] body = hasUnitSubTopicList.encode();
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("disable");
+ }
- response.setBody(body);
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
return response;
}
@@ -551,12 +572,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- TopicList hasUnitSubUnUnitTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();
- byte[] body = hasUnitSubUnUnitTopicList.encode();
+ boolean enableTopicList = namesrvController.getNamesrvConfig().isEnableTopicList();
+
+ if (enableTopicList) {
+ TopicList hasUnitSubUnUnitTopicList = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();
+ byte[] body = hasUnitSubUnUnitTopicList.encode();
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("disable");
+ }
- response.setBody(body);
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
return response;
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index f64640a26..aac1259ea 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -73,9 +74,6 @@ public class RouteInfoManager {
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
- // Contains topic route data needed by clients
- private final Map<String /* topic */, TopicRouteData> topicRouteDataMap;
-
private final BatchUnRegisterService unRegisterService;
private final NamesrvController namesrvController;
@@ -88,7 +86,6 @@ public class RouteInfoManager {
this.brokerLiveTable = new ConcurrentHashMap<BrokerAddrInfo, BrokerLiveInfo>(256);
this.filterServerTable = new ConcurrentHashMap<BrokerAddrInfo, List<String>>(256);
this.topicQueueMappingInfoTable = new ConcurrentHashMap<String, Map<String, TopicQueueMappingInfo>>(1024);
- this.topicRouteDataMap = new ConcurrentHashMap<>(1024);
this.unRegisterService = new BatchUnRegisterService(this, namesrvConfig);
this.namesrvConfig = namesrvConfig;
this.namesrvController = namesrvController;
@@ -140,8 +137,6 @@ public class RouteInfoManager {
this.topicQueueTable.put(topic, queueDataMap);
log.info("Register topic route:{}, {}", topic, queueDatas);
-
- updateTopicRouteData(Sets.newHashSet(topic));
}
} finally {
this.lock.writeLock().unlock();
@@ -156,7 +151,6 @@ public class RouteInfoManager {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
- this.updateTopicRouteData(Sets.newHashSet(topic));
} finally {
this.lock.writeLock().unlock();
}
@@ -186,7 +180,6 @@ public class RouteInfoManager {
break;
}
}
- this.updateTopicRouteData(Sets.newHashSet(topic));
}
} finally {
this.lock.writeLock().unlock();
@@ -294,8 +287,6 @@ public class RouteInfoManager {
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
- Set<String> configChangedTopics = new HashSet<>();
-
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());
@@ -306,17 +297,14 @@ public class RouteInfoManager {
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
- final TopicConfig topicConfig = entry.getValue();
- String topicName = topicConfig.getTopicName();
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
- topicConfigWrapper.getDataVersion(), brokerName, topicName)) {
- configChangedTopics.add(topicName);
-
+ topicConfigWrapper.getDataVersion(), brokerName,
+ entry.getValue().getTopicName())) {
+ final TopicConfig topicConfig = entry.getValue();
if (isPrimeSlave) {
- // Wipe the write perm for prime slave
+ // Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
}
-
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
@@ -369,14 +357,7 @@ public class RouteInfoManager {
}
}
- if (registerFirst && MixAll.MASTER_ID != brokerId) {
- configChangedTopics = this.topicSetOfBrokerName(brokerName);
- }
-
- // Update topicRouteDataMap
- updateTopicRouteData(configChangedTopics);
-
- if (isMinBrokerIdChanged && brokerData.isEnableActingMaster()) {
+ if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
@@ -390,54 +371,6 @@ public class RouteInfoManager {
return result;
}
- private void updateTopicRouteData(final Set<String> changedTopics) {
- for (final String changedTopic : changedTopics) {
- if (!this.topicQueueTable.containsKey(changedTopic)) {
- // This topic doesn't have any queue data route info
- // Just remove it from topicRouteDataMap
- this.topicRouteDataMap.remove(changedTopic);
- continue;
- }
-
- TopicRouteData topicRouteData = new TopicRouteData();
- boolean foundBrokerData = false;
- List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
- HashMap<String, List<String>> filterServerMap = topicRouteData.getFilterServerTable();
-
- Map<String, QueueData> queueDataMap = this.topicQueueTable.get(changedTopic);
- if (queueDataMap != null) {
- // Deep copy the queue data list
- List<QueueData> queueDataList = topicRouteData.getQueueDatas();
- for (final QueueData queueData : queueDataMap.values()) {
- queueDataList.add(new QueueData(queueData));
- }
-
- Set<String> brokerNameSet = new HashSet<String>(queueDataMap.keySet());
-
- for (String brokerName : brokerNameSet) {
- BrokerData brokerData = this.brokerAddrTable.get(brokerName);
- if (null != brokerData) {
- BrokerData brokerDataClone = new BrokerData(brokerData);
- brokerDataList.add(brokerDataClone);
- foundBrokerData = true;
-
- for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
- BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
- List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
- if (filterServerList != null) {
- filterServerMap.put(brokerAddr, new ArrayList<String>(filterServerList));
- }
- }
- }
- }
- }
-
- if (foundBrokerData) {
- this.topicRouteDataMap.put(changedTopic, topicRouteData);
- }
- }
- }
-
public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName) {
BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, brokerName);
try {
@@ -564,7 +497,6 @@ public class RouteInfoManager {
final QueueData qd = qdMap.get(brokerName);
if (qd != null) {
- changedTopics.add(entry.getKey());
int perm = qd.getPerm();
switch (requestCode) {
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
@@ -578,7 +510,6 @@ public class RouteInfoManager {
topicCnt++;
}
}
- this.updateTopicRouteData(changedTopics);
return topicCnt;
}
@@ -665,10 +596,9 @@ public class RouteInfoManager {
}
}
- Set<String> changedTopics = cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
- this.updateTopicRouteData(changedTopics);
+ cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
- if (!needNotifyBrokerMap.isEmpty()) {
+ if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(needNotifyBrokerMap);
}
} finally {
@@ -679,8 +609,7 @@ public class RouteInfoManager {
}
}
- private Set<String> cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
- Set<String> changedTopics = new HashSet<>();
+ private void cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
Iterator<Entry<String, Map<String, QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
while (itMap.hasNext()) {
Entry<String, Map<String, QueueData>> entry = itMap.next();
@@ -691,13 +620,12 @@ public class RouteInfoManager {
for (final String brokerName : removedBroker) {
final QueueData removedQD = queueDataMap.remove(brokerName);
if (removedQD != null) {
- changedTopics.add(topic);
- log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
+ log.debug("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
}
}
if (queueDataMap.isEmpty()) {
- log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
+ log.debug("removeTopicByBrokerName, remove the topic all queue {}", topic);
itMap.remove();
}
@@ -705,7 +633,6 @@ public class RouteInfoManager {
final QueueData queueData = queueDataMap.get(brokerName);
if (queueData != null) {
- changedTopics.add(topic);
if (this.brokerAddrTable.get(brokerName).isEnableActingMaster()) {
// Master has been unregistered, wipe the write perm
if (isNoMasterExists(brokerName)) {
@@ -715,8 +642,6 @@ public class RouteInfoManager {
}
}
}
-
- return changedTopics;
}
private boolean isNoMasterExists(String brokerName) {
@@ -732,40 +657,76 @@ public class RouteInfoManager {
return Collections.min(brokerData.getBrokerAddrs().keySet()) > 0;
}
- private Set<String> topicSetOfBrokerName(final String brokerName) {
- Set<String> topicOfBroker = new HashSet<>();
- for (final Entry<String, Map<String, QueueData>> entry : this.topicQueueTable.entrySet()) {
- if (entry.getValue().containsKey(brokerName)) {
- topicOfBroker.add(entry.getKey());
+ public TopicRouteData pickupTopicRouteData(final String topic) {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ boolean foundQueueData = false;
+ boolean foundBrokerData = false;
+ Set<String> brokerNameSet = new HashSet<String>();
+ List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
+ topicRouteData.setBrokerDatas(brokerDataList);
+
+ HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
+ topicRouteData.setFilterServerTable(filterServerMap);
+
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
+ if (queueDataMap != null) {
+ topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
+ foundQueueData = true;
+
+ brokerNameSet.addAll(queueDataMap.keySet());
+
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+ if (null != brokerData) {
+ BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
+ .getBrokerAddrs().clone(), brokerData.isEnableActingMaster());
+ brokerDataList.add(brokerDataClone);
+ foundBrokerData = true;
+ if (!filterServerTable.isEmpty()) {
+ for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
+ BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
+ List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
+ filterServerMap.put(brokerAddr, filterServerList);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
}
+ } catch (Exception e) {
+ log.error("pickupTopicRouteData Exception", e);
}
- return topicOfBroker;
- }
- public TopicRouteData pickupTopicRouteData(final String topic) {
- if (topic != null) {
- final TopicRouteData routeData = this.topicRouteDataMap.get(topic);
- if (routeData == null) {
+ log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
+
+ if (foundBrokerData && foundQueueData) {
+
+ if (topicRouteData == null) {
return null;
}
- routeData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
- log.debug("pickupTopicRouteData {} {}", topic, routeData);
- if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
- return routeData;
- }
+ topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
if (!namesrvConfig.isSupportActingMaster()) {
- return routeData;
+ return topicRouteData;
+ }
+
+ if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
+ return topicRouteData;
}
- if (routeData.getBrokerDatas().size() == 0 || routeData.getQueueDatas().size() == 0) {
- return routeData;
+ if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
+ return topicRouteData;
}
boolean needActingMaster = false;
- for (final BrokerData brokerData : routeData.getBrokerDatas()) {
+ for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (brokerData.getBrokerAddrs().size() != 0
&& !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
needActingMaster = true;
@@ -774,19 +735,17 @@ public class RouteInfoManager {
}
if (!needActingMaster) {
- return routeData;
+ return topicRouteData;
}
- final TopicRouteData cloneTopicRouteData = routeData.deepCloneTopicRouteData();
-
- for (final BrokerData brokerData : cloneTopicRouteData.getBrokerDatas()) {
+ for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
continue;
}
// No master
- for (final QueueData queueData : cloneTopicRouteData.getQueueDatas()) {
+ for (final QueueData queueData : topicRouteData.getQueueDatas()) {
if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
if (!PermName.isWriteable(queueData.getPerm())) {
final Long minBrokerId = Collections.min(brokerAddrs.keySet());
@@ -799,8 +758,9 @@ public class RouteInfoManager {
}
- return cloneTopicRouteData;
+ return topicRouteData;
}
+
return null;
}