You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/09 06:53:55 UTC
[rocketmq] 02/02: Finish the topicRoute2endPoints for static topic
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 40d8626927d5d9cf4ff1eed309a365611826c9a3
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 9 14:53:24 2021 +0800
Finish the topicRoute2endPoints for static topic
---
.../client/impl/factory/MQClientInstance.java | 141 +++++++++------------
.../client/impl/producer/TopicPublishInfo.java | 5 +
.../java/org/apache/rocketmq/common/MixAll.java | 2 +
3 files changed, 67 insertions(+), 81 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 874b1f8..b31757e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -61,13 +61,15 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -77,21 +79,17 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
- private final InternalLogger log = ClientLogger.getLog();
+ private final static InternalLogger log = ClientLogger.getLog();
private final ClientConfig clientConfig;
private final int instanceIndex;
private final String clientId;
@@ -103,6 +101,7 @@ public class MQClientInstance {
private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl;
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
+ private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
@@ -162,8 +161,42 @@ public class MQClientInstance {
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
+
+ public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
+ if (route.getTopicQueueMappingByBroker() == null
+ || route.getTopicQueueMappingByBroker().isEmpty()) {
+ return new ConcurrentHashMap<>();
+ }
+ ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<>();
+ int totalNums = 0;
+ for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
+ String brokerName = entry.getKey();
+ if (entry.getValue().getTotalQueues() > totalNums) {
+ if (totalNums != 0) {
+ log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
+ }
+ totalNums = entry.getValue().getTotalQueues();
+ }
+ for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
+ int globalId = idEntry.getKey();
+ MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
+ String oldBrokerName = mqEndPoints.put(mq, brokerName);
+ log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
+ }
+ }
+ //accomplish the static logic queues
+ for (int i = 0; i < totalNums; i++) {
+ MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
+ if (!mqEndPoints.containsKey(mq)) {
+ mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+ }
+ }
+ return mqEndPoints;
+ }
+
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
+ // TODO should check the usage of raw route, it is better to remove such field
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
@@ -177,28 +210,13 @@ public class MQClientInstance {
}
info.setOrderTopic(true);
- } else if (route.getOrderTopicConf() == null && route.getLogicalQueuesInfo() != null) {
+ } else if (route.getOrderTopicConf() == null
+ && route.getTopicQueueMappingByBroker() != null
+ && !route.getTopicQueueMappingByBroker().isEmpty()) {
info.setOrderTopic(false);
- List<MessageQueue> messageQueueList = info.getMessageQueueList();
- LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
- for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
- boolean someWritable = false;
- for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
- if (logicalQueueRouteData.isWritable()) {
- someWritable = true;
- break;
- }
- }
- if (!someWritable) {
- continue;
- }
- MessageQueue mq = new MessageQueue();
- mq.setQueueId(entry.getKey());
- mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
- mq.setTopic(topic);
- messageQueueList.add(mq);
- }
- Collections.sort(messageQueueList, new Comparator<MessageQueue>() {
+ ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
+ info.getMessageQueueList().addAll(mqEndPoints.keySet());
+ Collections.sort(info.getMessageQueueList(), new Comparator<MessageQueue>() {
@Override public int compare(MessageQueue o1, MessageQueue o2) {
return MixAll.compareInteger(o1.getQueueId(), o2.getQueueId());
}
@@ -239,26 +257,10 @@ public class MQClientInstance {
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
- if (route.getLogicalQueuesInfo() != null) {
- LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
- for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
- boolean someReadable = false;
- for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
- if (logicalQueueRouteData.isReadable()) {
- someReadable = true;
- break;
- }
- }
- if (!someReadable) {
- continue;
- }
- MessageQueue mq = new MessageQueue();
- mq.setQueueId(entry.getKey());
- mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
- mq.setTopic(topic);
- mqList.add(mq);
- }
- return mqList;
+ if (route.getTopicQueueMappingByBroker() != null
+ && !route.getTopicQueueMappingByBroker().isEmpty()) {
+ ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
+ return mqEndPoints.keySet();
}
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
@@ -656,11 +658,6 @@ public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
- return this.updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer, null);
- }
-
- public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
- DefaultMQProducer defaultMQProducer, Set<Integer> logicalQueueIdsFilter) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
@@ -676,7 +673,7 @@ public class MQClientInstance {
}
}
} else {
- topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout(), true, logicalQueueIdsFilter);
+ topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
@@ -688,31 +685,19 @@ public class MQClientInstance {
}
if (changed) {
- TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
- if (logicalQueueIdsFilter != null && cloneTopicRouteData.getLogicalQueuesInfo() != null) {
- TopicRouteData curTopicRouteData = this.topicRouteTable.get(topic);
- if (curTopicRouteData != null) {
- LogicalQueuesInfo curLogicalQueuesInfo = curTopicRouteData.getLogicalQueuesInfo();
- if (curLogicalQueuesInfo != null) {
- LogicalQueuesInfo cloneLogicalQueuesInfo = cloneTopicRouteData.getLogicalQueuesInfo();
- curLogicalQueuesInfo.readLock().lock();
- try {
- for (Entry<Integer, List<LogicalQueueRouteData>> entry : curLogicalQueuesInfo.entrySet()) {
- if (!cloneLogicalQueuesInfo.containsKey(entry.getKey())) {
- cloneLogicalQueuesInfo.put(entry.getKey(), entry.getValue());
- }
- }
- } finally {
- curLogicalQueuesInfo.readLock().unlock();
- }
- }
- }
- }
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
+ // Update endpoint map
+ {
+ ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
+ if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
+ topicEndPointsTable.put(topic, mqEndPoints);
+ }
+ }
+
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
@@ -739,6 +724,7 @@ public class MQClientInstance {
}
}
}
+ TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
@@ -865,13 +851,6 @@ public class MQClientInstance {
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
if (olddata == null || nowdata == null)
return true;
- LogicalQueuesInfo oldLogicalQueuesInfo = olddata.getLogicalQueuesInfo();
- LogicalQueuesInfo newLogicalQueuesInfo = nowdata.getLogicalQueuesInfo();
- if (oldLogicalQueuesInfo != null && newLogicalQueuesInfo != null) {
- return oldLogicalQueuesInfo.keySet().equals(newLogicalQueuesInfo.keySet());
- } else if (oldLogicalQueuesInfo != null || newLogicalQueuesInfo != null) {
- return true;
- }
TopicRouteData old = new TopicRouteData(olddata);
TopicRouteData now = new TopicRouteData(nowdata);
Collections.sort(old.getQueueDatas());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337e..60974cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,9 +17,13 @@
package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -116,4 +120,5 @@ public class TopicPublishInfo {
public void setTopicRouteData(final TopicRouteData topicRouteData) {
this.topicRouteData = topicRouteData;
}
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 8a7183f..ce54dae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -88,6 +88,8 @@ public class MixAll {
public static final String REPLY_MESSAGE_FLAG = "reply";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
+ public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__";
+
public static final Type TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA = new TypeReference<List<LogicalQueueRouteData>>() {
}.getType();