You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/07 15:48:29 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the
register process in namesrv
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 7707275 Finish the register process in namesrv
7707275 is described below
commit 7707275e37349a4ce6ed4a32547683196de17ff8
Author: dongeforever <do...@apache.org>
AuthorDate: Sun Nov 7 23:48:13 2021 +0800
Finish the register process in namesrv
---
.../apache/rocketmq/broker/BrokerController.java | 4 +-
.../broker/processor/AdminBrokerProcessor.java | 6 +-
.../broker/topic/TopicQueueMappingManager.java | 12 ++--
.../common/TopicConfigAndQueueMapping.java | 10 +--
...ppingInfo.java => TopicQueueMappingDetail.java} | 24 ++-----
.../rocketmq/common/TopicQueueMappingInfo.java | 79 ++++------------------
.../common/protocol/body/RegisterBrokerBody.java | 1 +
.../protocol/body/TopicQueueMappingBody.java | 4 +-
.../body/TopicQueueMappingSerializeWrapper.java | 8 +--
.../header/namesrv/GetRouteInfoRequestHeader.java | 20 ------
.../common/protocol/route/TopicRouteData.java | 27 ++++----
.../processor/ClusterTestRequestProcessor.java | 2 +-
.../namesrv/processor/DefaultRequestProcessor.java | 19 +-----
.../namesrv/routeinfo/RouteInfoManager.java | 74 +++++---------------
.../processor/DefaultRequestProcessorTest.java | 4 +-
15 files changed, 80 insertions(+), 214 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 926c43b..e08dbbe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1084,7 +1084,7 @@ public class BrokerController {
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
.map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
- .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.clone4register()))
+ .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.cloneAsMappingInfo()))
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
@@ -1103,7 +1103,7 @@ public class BrokerController {
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
- entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().clone4register())
+ entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().cloneAsMappingInfo())
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 99c7031..6bd742c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1726,11 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
return response;
}
- TopicQueueMappingInfo topicQueueMappingInfo = null;
+ TopicQueueMappingDetail topicQueueMappingDetail = null;
if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
- topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+ topicQueueMappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
}
- String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo));
+ String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingDetail));
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index da7ea8f..2f5d558 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
-import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -41,18 +41,18 @@ public class TopicQueueMappingManager extends ConfigManager {
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
- private final ConcurrentMap<String, TopicQueueMappingInfo> topicQueueMappingTable = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, TopicQueueMappingDetail> topicQueueMappingTable = new ConcurrentHashMap<>();
public TopicQueueMappingManager(BrokerController brokerController) {
this.brokerController = brokerController;
}
- public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) {
- topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo);
+ public void updateTopicQueueMapping(TopicQueueMappingDetail topicQueueMappingDetail) {
+ topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), topicQueueMappingDetail);
}
- public TopicQueueMappingInfo getTopicQueueMapping(String topic) {
+ public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
return topicQueueMappingTable.get(topic);
}
@@ -86,7 +86,7 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
- public ConcurrentMap<String, TopicQueueMappingInfo> getTopicQueueMappingTable() {
+ public ConcurrentMap<String, TopicQueueMappingDetail> getTopicQueueMappingTable() {
return topicQueueMappingTable;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index 7b1ea40..e7fef8d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -18,15 +18,15 @@ package org.apache.rocketmq.common;
public class TopicConfigAndQueueMapping {
private TopicConfig topicConfig;
- private TopicQueueMappingInfo topicQueueMappingInfo;
+ private TopicQueueMappingDetail topicQueueMappingDetail;
- public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) {
+ public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail) {
this.topicConfig = topicConfig;
- this.topicQueueMappingInfo = topicQueueMappingInfo;
+ this.topicQueueMappingDetail = topicQueueMappingDetail;
}
- public TopicQueueMappingInfo getTopicQueueMappingInfo() {
- return topicQueueMappingInfo;
+ public TopicQueueMappingDetail getTopicQueueMappingInfo() {
+ return topicQueueMappingDetail;
}
public TopicConfig getTopicConfig() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
similarity index 77%
copy from common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
copy to common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 85498df..a90ca7e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -17,31 +17,19 @@
package org.apache.rocketmq.common;
import com.google.common.collect.ImmutableList;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-public class TopicQueueMappingInfo extends RemotingSerializable {
- public static final int LEVEL_0 = 0;
- public static final int LEVEL_1 = 1;
+public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
- private String topic; // redundant field
- private int totalQueues;
- private String bname; //identify the hosted broker name
// the mapping info in current broker, do not register to nameserver
- private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
- //register to broker to construct the route
- private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
- //register to broker to help detect remapping failure
- private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
-
- public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
- this.topic = topic;
- this.totalQueues = totalQueues;
- this.bname = bname;
+ ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+
+ public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
+ super(topic, totalQueues, bname);
buildIdMap();
}
@@ -93,7 +81,7 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
}
- public TopicQueueMappingInfo clone4register() {
+ public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 85498df..c5bbeef 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -16,11 +16,8 @@
*/
package org.apache.rocketmq.common;
-import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -28,78 +25,20 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public static final int LEVEL_0 = 0;
public static final int LEVEL_1 = 1;
- private String topic; // redundant field
- private int totalQueues;
- private String bname; //identify the hosted broker name
- // the mapping info in current broker, do not register to nameserver
- private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+ String topic; // redundant field
+ int totalQueues;
+ String bname; //identify the hosted broker name
//register to broker to construct the route
- private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+ ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
- private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
+ protected ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
- buildIdMap();
}
- public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
- if (mappingInfo.isEmpty()) {
- return true;
- }
- hostedQueues.put(globalId, mappingInfo);
- buildIdMap();
- return true;
- }
-
- public void buildIdMap() {
- this.currIdMap = buildIdMap(LEVEL_0);
- this.prevIdMap = buildIdMap(LEVEL_1);
- }
-
- public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
- //level 0 means current leader in this broker
- //level 1 means previous leader in this broker
- assert level == LEVEL_0 || level == LEVEL_1;
-
- if (hostedQueues == null || hostedQueues.isEmpty()) {
- return new ConcurrentHashMap<Integer, Integer>();
- }
- ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
- for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
- Integer globalId = entry.getKey();
- ImmutableList<LogicQueueMappingItem> items = entry.getValue();
- if (level == LEVEL_0
- && items.size() >= 1) {
- LogicQueueMappingItem curr = items.get(items.size() - 1);
- if (bname.equals(curr.getBname())) {
- tmpIdMap.put(curr.getQueueId(), globalId);
- }
- } else if (level == LEVEL_1
- && items.size() >= 2) {
- LogicQueueMappingItem prev = items.get(items.size() - 1);
- if (bname.equals(prev.getBname())) {
- tmpIdMap.put(prev.getQueueId(), globalId);
- }
- }
- }
- return tmpIdMap;
- }
-
- public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
- return hostedQueues.get(globalId);
- }
-
-
- public TopicQueueMappingInfo clone4register() {
- TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
- topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
- topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
-
- return topicQueueMappingInfo;
- }
public int getTotalQueues() {
return totalQueues;
@@ -116,4 +55,12 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public String getTopic() {
return topic;
}
+
+ public ConcurrentMap<Integer, Integer> getCurrIdMap() {
+ return currIdMap;
+ }
+
+ public ConcurrentMap<Integer, Integer> getPrevIdMap() {
+ return prevIdMap;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index 2d7597f..ea84ea1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -33,6 +33,7 @@ import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
index 4caba89..edb1d34 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.common.protocol.body;
-import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
-public class TopicQueueMappingBody extends TopicQueueMappingInfo {
+public class TopicQueueMappingBody extends TopicQueueMappingDetail {
public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
index 1d3d6c5..317ccd7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
@@ -18,20 +18,20 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.Map;
public class TopicQueueMappingSerializeWrapper extends RemotingSerializable {
- private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap;
+ private Map<String/* topic */, TopicQueueMappingDetail> topicQueueMappingInfoMap;
private DataVersion dataVersion = new DataVersion();
- public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
+ public Map<String, TopicQueueMappingDetail> getTopicQueueMappingInfoMap() {
return topicQueueMappingInfoMap;
}
- public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
+ public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingDetail> topicQueueMappingInfoMap) {
this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
index ad776c8..a2806e6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
@@ -20,7 +20,6 @@
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
-import java.util.Set;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -29,9 +28,6 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
@CFNotNull
private String topic;
- private int sysFlag;
- private Set<Integer> logicalQueueIdsFilter;
-
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -43,20 +39,4 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
public void setTopic(String topic) {
this.topic = topic;
}
-
- public int getSysFlag() {
- return sysFlag;
- }
-
- public void setSysFlag(int sysFlag) {
- this.sysFlag = sysFlag;
- }
-
- public void setLogicalQueueIdsFilter(Set<Integer> filter) {
- this.logicalQueueIdsFilter = filter;
- }
-
- public Set<Integer> getLogicalQueueIdsFilter() {
- return logicalQueueIdsFilter;
- }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index 4470a2a..c2cad6c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -23,6 +23,9 @@ package org.apache.rocketmq.common.protocol.route;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicRouteData extends RemotingSerializable {
@@ -30,7 +33,7 @@ public class TopicRouteData extends RemotingSerializable {
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- private LogicalQueuesInfo logicalQueuesInfo;
+ private Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker;
public TopicRouteData() {
}
@@ -53,8 +56,8 @@ public class TopicRouteData extends RemotingSerializable {
this.filterServerTable.putAll(topicRouteData.filterServerTable);
}
- if (topicRouteData.logicalQueuesInfo != null) {
- this.logicalQueuesInfo = new LogicalQueuesInfo(topicRouteData.logicalQueuesInfo);
+ if (topicRouteData.topicQueueMappingByBroker != null) {
+ this.topicQueueMappingByBroker = new HashMap<String, TopicQueueMappingInfo>(topicRouteData.topicQueueMappingByBroker);
}
}
@@ -90,12 +93,12 @@ public class TopicRouteData extends RemotingSerializable {
this.orderTopicConf = orderTopicConf;
}
- public LogicalQueuesInfo getLogicalQueuesInfo() {
- return logicalQueuesInfo;
+ public Map<String, TopicQueueMappingInfo> getTopicQueueMappingByBroker() {
+ return topicQueueMappingByBroker;
}
- public void setLogicalQueuesInfo(LogicalQueuesInfo logicalQueuesInfo) {
- this.logicalQueuesInfo = logicalQueuesInfo;
+ public void setTopicQueueMappingByBroker(Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker) {
+ this.topicQueueMappingByBroker = topicQueueMappingByBroker;
}
@Override
@@ -106,7 +109,7 @@ public class TopicRouteData extends RemotingSerializable {
result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode());
result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode());
result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode());
- result = prime * result + ((logicalQueuesInfo == null) ? 0 : logicalQueuesInfo.hashCode());
+ result = prime * result + ((topicQueueMappingByBroker == null) ? 0 : topicQueueMappingByBroker.hashCode());
return result;
}
@@ -139,10 +142,10 @@ public class TopicRouteData extends RemotingSerializable {
return false;
} else if (!filterServerTable.equals(other.filterServerTable))
return false;
- if (logicalQueuesInfo == null) {
- if (other.logicalQueuesInfo != null)
+ if (topicQueueMappingByBroker == null) {
+ if (other.topicQueueMappingByBroker != null)
return false;
- } else if (!logicalQueuesInfo.equals(other.logicalQueuesInfo))
+ } else if (!topicQueueMappingByBroker.equals(other.topicQueueMappingByBroker))
return false;
return true;
}
@@ -150,6 +153,6 @@ public class TopicRouteData extends RemotingSerializable {
@Override
public String toString() {
return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas
- + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + ", logicalQueuesInfo=" + logicalQueuesInfo + "]";
+ + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + ", topicQueueMappingInfoTable=" + topicQueueMappingByBroker + "]";
}
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index dd15288..a58a3b9 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -56,7 +56,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
- TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), false);
+ TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 85e3883..98e96df 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
@@ -53,9 +52,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -353,9 +350,7 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
- boolean includeLogicalQueuesInfo = (requestHeader.getSysFlag() & MessageSysFlag.LOGICAL_QUEUE_FLAG) > 0;
-
- TopicRouteDataNameSrv topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), includeLogicalQueuesInfo);
+ TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
@@ -365,16 +360,6 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
topicRouteData.setOrderTopicConf(orderTopicConf);
}
- Set<Integer> logicalQueueIdsFilter = requestHeader.getLogicalQueueIdsFilter();
- if (logicalQueueIdsFilter != null) {
- LogicalQueuesInfoUnordered logicalQueuesInfo = topicRouteData.getLogicalQueuesInfoUnordered();
- if (logicalQueuesInfo != null) {
- LogicalQueuesInfoUnordered filtered = new LogicalQueuesInfoUnordered(logicalQueueIdsFilter.size());
- logicalQueueIdsFilter.forEach(integer -> filtered.put(integer, logicalQueuesInfo.get(integer)));
- topicRouteData.setLogicalQueuesInfoUnordered(filtered);
- }
- }
-
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ab9e77f..bfdf53c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.namesrv.routeinfo;
import io.netty.channel.Channel;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -25,19 +24,18 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -45,13 +43,10 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
@@ -62,7 +57,8 @@ public class RouteInfoManager {
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- private final ConcurrentMap<String/* topic */, LogicalQueuesInfoUnordered> logicalQueuesInfoTable;
+ private final HashMap<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
+
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
@@ -70,7 +66,7 @@ public class RouteInfoManager {
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
- this.logicalQueuesInfoTable = new ConcurrentHashMap<>(1024);
+ this.topicQueueMappingInfoTable = new HashMap<String, Map<String, TopicQueueMappingInfo>>(1024);
}
public byte[] getAllClusterInfo() {
@@ -158,20 +154,21 @@ public class RouteInfoManager {
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
- Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigWrapper.getLogicalQueuesInfoMap();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
- if (logicalQueuesInfoMap != null) {
- long startTime = System.nanoTime();
- for (Map.Entry<String, LogicalQueuesInfo> entry : logicalQueuesInfoMap.entrySet()) {
- String topicName = entry.getKey();
- LogicalQueuesInfoUnordered logicalQueuesInfo = ConcurrentHashMapUtil.computeIfAbsent(this.logicalQueuesInfoTable, topicName, ignore -> new LogicalQueuesInfoUnordered());
- mergeLogicalQueuesInfo(brokerName, topicName, logicalQueuesInfo, entry.getValue());
+
+ TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+
+ for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
+ if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+ topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
}
- log.debug("mergeQueueRouteDataTable topic={} time={}ns", System.nanoTime() - startTime);
+ topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
@@ -403,12 +400,8 @@ public class RouteInfoManager {
}
}
- public TopicRouteDataNameSrv pickupTopicRouteData(final String topic) {
- return pickupTopicRouteData(topic, false);
- }
-
- public TopicRouteDataNameSrv pickupTopicRouteData(final String topic, boolean includeLogicalQueuesInfo) {
- TopicRouteDataNameSrv topicRouteData = new TopicRouteDataNameSrv();
+ public TopicRouteData pickupTopicRouteData(final String topic) {
+ TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
@@ -417,6 +410,7 @@ public class RouteInfoManager {
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
+ topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
try {
try {
@@ -456,10 +450,6 @@ public class RouteInfoManager {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
- if (includeLogicalQueuesInfo) {
- topicRouteData.setLogicalQueuesInfoUnordered(logicalQueuesInfoTable.get(topic));
- }
-
return topicRouteData;
}
@@ -790,34 +780,6 @@ public class RouteInfoManager {
return topicList.encode();
}
-
- private static void mergeLogicalQueuesInfo(String brokerName, String topicName,
- LogicalQueuesInfoUnordered logicalQueuesInfoInNamesrv,
- LogicalQueuesInfo logicalQueuesInfoFromBroker) {
- Set<LogicalQueuesInfoUnordered.Key> newKeys = logicalQueuesInfoFromBroker.values()
- .stream()
- .flatMap(Collection::stream)
- .filter(v -> Objects.equals(brokerName, v.getBrokerName()))
- .map(v -> new LogicalQueuesInfoUnordered.Key(null, v.getQueueId(), v.getOffsetDelta()))
- .collect(Collectors.toSet());
- logicalQueuesInfoInNamesrv.values().forEach(m ->
- m.values().removeIf(queueRouteData ->
- Objects.equals(brokerName, queueRouteData.getBrokerName()) &&
- !newKeys.contains(new LogicalQueuesInfoUnordered.Key(null, queueRouteData.getQueueId(), queueRouteData.getOffsetDelta()))));
- logicalQueuesInfoFromBroker.forEach((logicalQueueId, queueRouteDataListFromBroker) -> {
- if (logicalQueueId == null) {
- log.warn("queueRouteDataTable topic {} contains null logicalQueueId: {}", topicName, logicalQueuesInfoFromBroker);
- return;
- }
- queueRouteDataListFromBroker.stream()
- .filter(queueRouteDataFromBroker -> Objects.equals(brokerName, queueRouteDataFromBroker.getBrokerName()))
- .forEach(queueRouteDataFromBroker ->
- ConcurrentHashMapUtil.computeIfAbsent(logicalQueuesInfoInNamesrv, logicalQueueId, ignored -> new ConcurrentHashMap<>(queueRouteDataListFromBroker.size()))
- .put(new LogicalQueuesInfoUnordered.Key(brokerName, queueRouteDataFromBroker.getQueueId(), queueRouteDataFromBroker.getOffsetDelta()),
- queueRouteDataFromBroker)
- );
- });
- }
}
class BrokerLiveInfo {
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
index 97e6126..38bff74 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
@@ -200,7 +200,7 @@ public class DefaultRequestProcessorTest {
.contains(new HashMap.SimpleEntry("broker", broker));
}
- @Test
+ /*@Test
public void testProcessRequest_RegisterBrokerLogicalQueue() throws Exception {
String cluster = "cluster";
String broker1Name = "broker1";
@@ -299,7 +299,7 @@ public class DefaultRequestProcessorTest {
assertThat(topicRouteDataNameSrv.getLogicalQueuesInfoUnordered()).isEqualTo(logicalQueuesInfoUnordered);
}
}
-
+*/
@Test
public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {