You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/09 06:53:53 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated (7707275 -> 40d8626)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a change to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from 7707275  Finish the register process in namesrv
     new a64ad01  Add some notes, and revert the id map, set the globalId first
     new 40d8626  Finish the topicRoute2endPoints for static topic

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../client/impl/factory/MQClientInstance.java      | 141 +++++++++------------
 .../client/impl/producer/TopicPublishInfo.java     |   5 +
 .../java/org/apache/rocketmq/common/MixAll.java    |   2 +
 .../rocketmq/common/TopicQueueMappingDetail.java   |   4 +-
 .../rocketmq/common/TopicQueueMappingInfo.java     |   4 +-
 .../common/protocol/route/TopicRouteData.java      |   3 +-
 .../namesrv/routeinfo/RouteInfoManager.java        |   2 +-
 7 files changed, 74 insertions(+), 87 deletions(-)

[rocketmq] 02/02: Finish the topicRoute2endPoints for static topic

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 40d8626927d5d9cf4ff1eed309a365611826c9a3
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 9 14:53:24 2021 +0800

    Finish the topicRoute2endPoints for static topic
---
 .../client/impl/factory/MQClientInstance.java      | 141 +++++++++------------
 .../client/impl/producer/TopicPublishInfo.java     |   5 +
 .../java/org/apache/rocketmq/common/MixAll.java    |   2 +
 3 files changed, 67 insertions(+), 81 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 874b1f8..b31757e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -61,13 +61,15 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageQueueAssignment;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -77,21 +79,17 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-
 public class MQClientInstance {
     private final static long LOCK_TIMEOUT_MILLIS = 3000;
-    private final InternalLogger log = ClientLogger.getLog();
+    private final static InternalLogger log = ClientLogger.getLog();
     private final ClientConfig clientConfig;
     private final int instanceIndex;
     private final String clientId;
@@ -103,6 +101,7 @@ public class MQClientInstance {
     private final MQClientAPIImpl mQClientAPIImpl;
     private final MQAdminImpl mQAdminImpl;
     private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
+    private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();
     private final Lock lockNamesrv = new ReentrantLock();
     private final Lock lockHeartbeat = new ReentrantLock();
     private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
@@ -162,8 +161,42 @@ public class MQClientInstance {
             MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
     }
 
+
+    public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
+        if (route.getTopicQueueMappingByBroker() == null
+            || route.getTopicQueueMappingByBroker().isEmpty()) {
+            return new ConcurrentHashMap<>();
+        }
+        ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<>();
+        int totalNums = 0;
+        for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
+            String brokerName = entry.getKey();
+            if (entry.getValue().getTotalQueues() > totalNums) {
+                if (totalNums != 0) {
+                    log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
+                }
+                totalNums = entry.getValue().getTotalQueues();
+            }
+            for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
+                int globalId = idEntry.getKey();
+                MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
+                String oldBrokerName = mqEndPoints.put(mq, brokerName);
+                log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
+            }
+        }
+        //accomplish the static logic queues
+        for (int i = 0; i < totalNums; i++) {
+            MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
+            if (!mqEndPoints.containsKey(mq)) {
+                mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+            }
+        }
+        return mqEndPoints;
+    }
+
     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
         TopicPublishInfo info = new TopicPublishInfo();
+        // TODO should check the usage of raw route, it is better to remove such field
         info.setTopicRouteData(route);
         if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
             String[] brokers = route.getOrderTopicConf().split(";");
@@ -177,28 +210,13 @@ public class MQClientInstance {
             }
 
             info.setOrderTopic(true);
-        } else if (route.getOrderTopicConf() == null  && route.getLogicalQueuesInfo() != null) {
+        } else if (route.getOrderTopicConf() == null
+                && route.getTopicQueueMappingByBroker() != null
+                && !route.getTopicQueueMappingByBroker().isEmpty()) {
             info.setOrderTopic(false);
-            List<MessageQueue> messageQueueList = info.getMessageQueueList();
-            LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
-            for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
-                boolean someWritable = false;
-                for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
-                    if (logicalQueueRouteData.isWritable()) {
-                        someWritable = true;
-                        break;
-                    }
-                }
-                if (!someWritable) {
-                    continue;
-                }
-                MessageQueue mq = new MessageQueue();
-                mq.setQueueId(entry.getKey());
-                mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-                mq.setTopic(topic);
-                messageQueueList.add(mq);
-            }
-            Collections.sort(messageQueueList, new Comparator<MessageQueue>() {
+            ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
+            info.getMessageQueueList().addAll(mqEndPoints.keySet());
+            Collections.sort(info.getMessageQueueList(), new Comparator<MessageQueue>() {
                 @Override public int compare(MessageQueue o1, MessageQueue o2) {
                     return MixAll.compareInteger(o1.getQueueId(), o2.getQueueId());
                 }
@@ -239,26 +257,10 @@ public class MQClientInstance {
 
     public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
         Set<MessageQueue> mqList = new HashSet<MessageQueue>();
-        if (route.getLogicalQueuesInfo() != null) {
-            LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
-            for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
-                boolean someReadable = false;
-                for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
-                    if (logicalQueueRouteData.isReadable()) {
-                        someReadable = true;
-                        break;
-                    }
-                }
-                if (!someReadable) {
-                    continue;
-                }
-                MessageQueue mq = new MessageQueue();
-                mq.setQueueId(entry.getKey());
-                mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-                mq.setTopic(topic);
-                mqList.add(mq);
-            }
-            return mqList;
+        if (route.getTopicQueueMappingByBroker() != null
+                && !route.getTopicQueueMappingByBroker().isEmpty()) {
+            ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
+            return mqEndPoints.keySet();
         }
         List<QueueData> qds = route.getQueueDatas();
         for (QueueData qd : qds) {
@@ -656,11 +658,6 @@ public class MQClientInstance {
 
     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
         DefaultMQProducer defaultMQProducer) {
-        return this.updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer, null);
-    }
-
-    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
-        DefaultMQProducer defaultMQProducer, Set<Integer> logicalQueueIdsFilter) {
         try {
             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
@@ -676,7 +673,7 @@ public class MQClientInstance {
                             }
                         }
                     } else {
-                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout(), true, logicalQueueIdsFilter);
+                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                     }
                     if (topicRouteData != null) {
                         TopicRouteData old = this.topicRouteTable.get(topic);
@@ -688,31 +685,19 @@ public class MQClientInstance {
                         }
 
                         if (changed) {
-                            TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
-                            if (logicalQueueIdsFilter != null && cloneTopicRouteData.getLogicalQueuesInfo() != null) {
-                                TopicRouteData curTopicRouteData = this.topicRouteTable.get(topic);
-                                if (curTopicRouteData != null) {
-                                    LogicalQueuesInfo curLogicalQueuesInfo = curTopicRouteData.getLogicalQueuesInfo();
-                                    if (curLogicalQueuesInfo != null) {
-                                        LogicalQueuesInfo cloneLogicalQueuesInfo = cloneTopicRouteData.getLogicalQueuesInfo();
-                                        curLogicalQueuesInfo.readLock().lock();
-                                        try {
-                                            for (Entry<Integer, List<LogicalQueueRouteData>> entry : curLogicalQueuesInfo.entrySet()) {
-                                                if (!cloneLogicalQueuesInfo.containsKey(entry.getKey())) {
-                                                    cloneLogicalQueuesInfo.put(entry.getKey(), entry.getValue());
-                                                }
-                                            }
-                                        } finally {
-                                            curLogicalQueuesInfo.readLock().unlock();
-                                        }
-                                    }
-                                }
-                            }
 
                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                             }
 
+                            // Update endpoint map
+                            {
+                                ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
+                                if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
+                                    topicEndPointsTable.put(topic, mqEndPoints);
+                                }
+                            }
+
                             // Update Pub info
                             {
                                 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
@@ -739,6 +724,7 @@ public class MQClientInstance {
                                     }
                                 }
                             }
+                            TopicRouteData cloneTopicRouteData =  new TopicRouteData(topicRouteData);
                             log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                             this.topicRouteTable.put(topic, cloneTopicRouteData);
                             return true;
@@ -865,13 +851,6 @@ public class MQClientInstance {
     private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
         if (olddata == null || nowdata == null)
             return true;
-        LogicalQueuesInfo oldLogicalQueuesInfo = olddata.getLogicalQueuesInfo();
-        LogicalQueuesInfo newLogicalQueuesInfo = nowdata.getLogicalQueuesInfo();
-        if (oldLogicalQueuesInfo != null && newLogicalQueuesInfo != null) {
-            return oldLogicalQueuesInfo.keySet().equals(newLogicalQueuesInfo.keySet());
-        } else if (oldLogicalQueuesInfo != null || newLogicalQueuesInfo != null) {
-            return true;
-        }
         TopicRouteData old = new TopicRouteData(olddata);
         TopicRouteData now = new TopicRouteData(nowdata);
         Collections.sort(old.getQueueDatas());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337e..60974cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,9 +17,13 @@
 package org.apache.rocketmq.client.impl.producer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 
@@ -116,4 +120,5 @@ public class TopicPublishInfo {
     public void setTopicRouteData(final TopicRouteData topicRouteData) {
         this.topicRouteData = topicRouteData;
     }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 8a7183f..ce54dae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -88,6 +88,8 @@ public class MixAll {
     public static final String REPLY_MESSAGE_FLAG = "reply";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
     public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
+    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__";
+
     public static final Type TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA = new TypeReference<List<LogicalQueueRouteData>>() {
         }.getType();
 

[rocketmq] 01/02: Add some notes, and revert the id map, set the globalId first

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit a64ad01020e77b22bef18d5e240cb99df18170d4
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Nov 8 11:08:31 2021 +0800

    Add some notes, and revert the id map, set the globalId first
---
 .../main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java | 4 ++--
 .../main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java   | 4 ++--
 .../org/apache/rocketmq/common/protocol/route/TopicRouteData.java     | 3 ++-
 .../java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java  | 2 +-
 4 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index a90ca7e..d3e3d92 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -63,13 +63,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
                     && items.size() >= 1) {
                 LogicQueueMappingItem curr = items.get(items.size() - 1);
                 if (bname.equals(curr.getBname())) {
-                    tmpIdMap.put(curr.getQueueId(), globalId);
+                    tmpIdMap.put(globalId, curr.getQueueId());
                 }
             } else if (level == LEVEL_1
                     && items.size() >= 2) {
                 LogicQueueMappingItem prev = items.get(items.size() - 1);
                 if (bname.equals(prev.getBname())) {
-                    tmpIdMap.put(prev.getQueueId(), globalId);
+                    tmpIdMap.put(globalId, prev.getQueueId());
                 }
             }
         }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index c5bbeef..2d76365 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -29,9 +29,9 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     int totalQueues;
     String bname;  //identify the hosted broker name
     //register to broker to construct the route
-    ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+    ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
     //register to broker to help detect remapping failure
-    protected ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
+    ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
 
     public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
         this.topic = topic;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index c2cad6c..2ce6a53 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -33,7 +33,8 @@ public class TopicRouteData extends RemotingSerializable {
     private List<QueueData> queueDatas;
     private List<BrokerData> brokerDatas;
     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-    private Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker;
+    //It could be null or empty
+    private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
 
     public TopicRouteData() {
     }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index bfdf53c..90bc197 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -163,7 +163,7 @@ public class RouteInfoManager {
                         TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
 
                         Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
-
+                        //the topicQueueMappingInfoMap should never be null, but can be empty
                         for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
                             if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                                 topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());