You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/09 06:53:55 UTC

[rocketmq] 02/02: Finish the topicRoute2endPoints for static topic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 40d8626927d5d9cf4ff1eed309a365611826c9a3
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 9 14:53:24 2021 +0800

    Finish the topicRoute2endPoints for static topic
---
 .../client/impl/factory/MQClientInstance.java      | 141 +++++++++------------
 .../client/impl/producer/TopicPublishInfo.java     |   5 +
 .../java/org/apache/rocketmq/common/MixAll.java    |   2 +
 3 files changed, 67 insertions(+), 81 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 874b1f8..b31757e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -61,13 +61,15 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageQueueAssignment;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -77,21 +79,17 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-
 public class MQClientInstance {
     private final static long LOCK_TIMEOUT_MILLIS = 3000;
-    private final InternalLogger log = ClientLogger.getLog();
+    private final static InternalLogger log = ClientLogger.getLog();
     private final ClientConfig clientConfig;
     private final int instanceIndex;
     private final String clientId;
@@ -103,6 +101,7 @@ public class MQClientInstance {
     private final MQClientAPIImpl mQClientAPIImpl;
     private final MQAdminImpl mQAdminImpl;
     private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
+    private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();
     private final Lock lockNamesrv = new ReentrantLock();
     private final Lock lockHeartbeat = new ReentrantLock();
     private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
@@ -162,8 +161,42 @@ public class MQClientInstance {
             MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
     }
 
+
+    public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
+        if (route.getTopicQueueMappingByBroker() == null
+            || route.getTopicQueueMappingByBroker().isEmpty()) {
+            return new ConcurrentHashMap<>();
+        }
+        ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<>();
+        int totalNums = 0;
+        for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
+            String brokerName = entry.getKey();
+            if (entry.getValue().getTotalQueues() > totalNums) {
+                if (totalNums != 0) {
+                    log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
+                }
+                totalNums = entry.getValue().getTotalQueues();
+            }
+            for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
+                int globalId = idEntry.getKey();
+                MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
+                String oldBrokerName = mqEndPoints.put(mq, brokerName);
+                log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
+            }
+        }
+        //accomplish the static logic queues
+        for (int i = 0; i < totalNums; i++) {
+            MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
+            if (!mqEndPoints.containsKey(mq)) {
+                mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+            }
+        }
+        return mqEndPoints;
+    }
+
     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
         TopicPublishInfo info = new TopicPublishInfo();
+        // TODO should check the usage of raw route, it is better to remove such field
         info.setTopicRouteData(route);
         if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
             String[] brokers = route.getOrderTopicConf().split(";");
@@ -177,28 +210,13 @@ public class MQClientInstance {
             }
 
             info.setOrderTopic(true);
-        } else if (route.getOrderTopicConf() == null  && route.getLogicalQueuesInfo() != null) {
+        } else if (route.getOrderTopicConf() == null
+                && route.getTopicQueueMappingByBroker() != null
+                && !route.getTopicQueueMappingByBroker().isEmpty()) {
             info.setOrderTopic(false);
-            List<MessageQueue> messageQueueList = info.getMessageQueueList();
-            LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
-            for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
-                boolean someWritable = false;
-                for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
-                    if (logicalQueueRouteData.isWritable()) {
-                        someWritable = true;
-                        break;
-                    }
-                }
-                if (!someWritable) {
-                    continue;
-                }
-                MessageQueue mq = new MessageQueue();
-                mq.setQueueId(entry.getKey());
-                mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-                mq.setTopic(topic);
-                messageQueueList.add(mq);
-            }
-            Collections.sort(messageQueueList, new Comparator<MessageQueue>() {
+            ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
+            info.getMessageQueueList().addAll(mqEndPoints.keySet());
+            Collections.sort(info.getMessageQueueList(), new Comparator<MessageQueue>() {
                 @Override public int compare(MessageQueue o1, MessageQueue o2) {
                     return MixAll.compareInteger(o1.getQueueId(), o2.getQueueId());
                 }
@@ -239,26 +257,10 @@ public class MQClientInstance {
 
     public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
         Set<MessageQueue> mqList = new HashSet<MessageQueue>();
-        if (route.getLogicalQueuesInfo() != null) {
-            LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
-            for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
-                boolean someReadable = false;
-                for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
-                    if (logicalQueueRouteData.isReadable()) {
-                        someReadable = true;
-                        break;
-                    }
-                }
-                if (!someReadable) {
-                    continue;
-                }
-                MessageQueue mq = new MessageQueue();
-                mq.setQueueId(entry.getKey());
-                mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-                mq.setTopic(topic);
-                mqList.add(mq);
-            }
-            return mqList;
+        if (route.getTopicQueueMappingByBroker() != null
+                && !route.getTopicQueueMappingByBroker().isEmpty()) {
+            ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
+            return mqEndPoints.keySet();
         }
         List<QueueData> qds = route.getQueueDatas();
         for (QueueData qd : qds) {
@@ -656,11 +658,6 @@ public class MQClientInstance {
 
     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
         DefaultMQProducer defaultMQProducer) {
-        return this.updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer, null);
-    }
-
-    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
-        DefaultMQProducer defaultMQProducer, Set<Integer> logicalQueueIdsFilter) {
         try {
             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
@@ -676,7 +673,7 @@ public class MQClientInstance {
                             }
                         }
                     } else {
-                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout(), true, logicalQueueIdsFilter);
+                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                     }
                     if (topicRouteData != null) {
                         TopicRouteData old = this.topicRouteTable.get(topic);
@@ -688,31 +685,19 @@ public class MQClientInstance {
                         }
 
                         if (changed) {
-                            TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
-                            if (logicalQueueIdsFilter != null && cloneTopicRouteData.getLogicalQueuesInfo() != null) {
-                                TopicRouteData curTopicRouteData = this.topicRouteTable.get(topic);
-                                if (curTopicRouteData != null) {
-                                    LogicalQueuesInfo curLogicalQueuesInfo = curTopicRouteData.getLogicalQueuesInfo();
-                                    if (curLogicalQueuesInfo != null) {
-                                        LogicalQueuesInfo cloneLogicalQueuesInfo = cloneTopicRouteData.getLogicalQueuesInfo();
-                                        curLogicalQueuesInfo.readLock().lock();
-                                        try {
-                                            for (Entry<Integer, List<LogicalQueueRouteData>> entry : curLogicalQueuesInfo.entrySet()) {
-                                                if (!cloneLogicalQueuesInfo.containsKey(entry.getKey())) {
-                                                    cloneLogicalQueuesInfo.put(entry.getKey(), entry.getValue());
-                                                }
-                                            }
-                                        } finally {
-                                            curLogicalQueuesInfo.readLock().unlock();
-                                        }
-                                    }
-                                }
-                            }
 
                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                             }
 
+                            // Update endpoint map
+                            {
+                                ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
+                                if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
+                                    topicEndPointsTable.put(topic, mqEndPoints);
+                                }
+                            }
+
                             // Update Pub info
                             {
                                 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
@@ -739,6 +724,7 @@ public class MQClientInstance {
                                     }
                                 }
                             }
+                            TopicRouteData cloneTopicRouteData =  new TopicRouteData(topicRouteData);
                             log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                             this.topicRouteTable.put(topic, cloneTopicRouteData);
                             return true;
@@ -865,13 +851,6 @@ public class MQClientInstance {
     private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
         if (olddata == null || nowdata == null)
             return true;
-        LogicalQueuesInfo oldLogicalQueuesInfo = olddata.getLogicalQueuesInfo();
-        LogicalQueuesInfo newLogicalQueuesInfo = nowdata.getLogicalQueuesInfo();
-        if (oldLogicalQueuesInfo != null && newLogicalQueuesInfo != null) {
-            return oldLogicalQueuesInfo.keySet().equals(newLogicalQueuesInfo.keySet());
-        } else if (oldLogicalQueuesInfo != null || newLogicalQueuesInfo != null) {
-            return true;
-        }
         TopicRouteData old = new TopicRouteData(olddata);
         TopicRouteData now = new TopicRouteData(nowdata);
         Collections.sort(old.getQueueDatas());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337e..60974cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,9 +17,13 @@
 package org.apache.rocketmq.client.impl.producer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 
@@ -116,4 +120,5 @@ public class TopicPublishInfo {
     public void setTopicRouteData(final TopicRouteData topicRouteData) {
         this.topicRouteData = topicRouteData;
     }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 8a7183f..ce54dae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -88,6 +88,8 @@ public class MixAll {
     public static final String REPLY_MESSAGE_FLAG = "reply";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
     public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
+    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__";
+
     public static final Type TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA = new TypeReference<List<LogicalQueueRouteData>>() {
         }.getType();