You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/23 03:58:23 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the code

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 3d68590  Polish the code
3d68590 is described below

commit 3d68590377079e290c8c5f34b9a8834f70da02c4
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 23 11:58:04 2021 +0800

    Polish the code
---
 .../apache/rocketmq/common/rpc/ClientMetadata.java |  2 +-
 .../common/statictopic/TopicQueueMappingUtils.java | 74 ++++++++++++++++++++++
 .../remoting/netty/NettyRemotingClient.java        |  1 +
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    | 10 +++
 .../tools/admin/DefaultMQAdminExtImpl.java         | 31 +++++++++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  5 ++
 6 files changed, 122 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 53ffa51..499b6a8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -112,7 +112,7 @@ public class ClientMetadata {
         for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
             TopicQueueMappingInfo info = entry.getValue();
             if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
-                maxTotalNums = entry.getValue().getTotalQueues();
+                maxTotalNums = info.getTotalQueues();
             }
             for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
                 int globalId = idEntry.getKey();
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 7ac7ce8..dc88876 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -18,6 +18,9 @@ package org.apache.rocketmq.common.statictopic;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 
 import java.io.File;
 import java.util.AbstractMap;
@@ -29,6 +32,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class TopicQueueMappingUtils {
 
@@ -306,6 +311,75 @@ public class TopicQueueMappingUtils {
         }
     }
 
+    public Map<String, TopicConfigAndQueueMapping> createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+        Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
+        Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
+        if (!brokerConfigMap.isEmpty()) {
+            maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+            globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+        }
+        if (queueNum < globalIdMap.size()) {
+            throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
+        }
+        //check the queue number
+        if (queueNum == globalIdMap.size()) {
+            throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
+        }
 
+        //the check is ok, now do the mapping allocation
+        Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
+        for (String broker: targetBrokers) {
+            brokerNumMap.put(broker, 0);
+        }
+        final Map<Integer, String> oldIdToBroker = new HashMap<Integer, String>();
+        for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
+            String leaderbroker = entry.getValue().getBname();
+            oldIdToBroker.put(entry.getKey(), leaderbroker);
+            if (!brokerNumMap.containsKey(leaderbroker)) {
+                brokerNumMap.put(leaderbroker, 1);
+            } else {
+                brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
+            }
+        }
+        TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
+        allocator.upToNum(queueNum);
+        Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
+
+        //construct the topic configAndMapping
+        long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
+        for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
+            Integer queueId = e.getKey();
+            String broker = e.getValue();
+            if (globalIdMap.containsKey(queueId)) {
+                //ignore the exited
+                continue;
+            }
+            TopicConfigAndQueueMapping configMapping;
+            if (!brokerConfigMap.containsKey(broker)) {
+                configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
+                configMapping.setWriteQueueNums(1);
+                configMapping.setReadQueueNums(1);
+                brokerConfigMap.put(broker, configMapping);
+            } else {
+                configMapping = brokerConfigMap.get(broker);
+                configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+                configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
+            }
+            LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
+            configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
+        }
+
+        // set the topic config
+        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            TopicConfigAndQueueMapping configMapping = entry.getValue();
+            configMapping.getMappingDetail().setEpoch(newEpoch);
+            configMapping.getMappingDetail().setTotalQueues(queueNum);
+        }
+        //double check the config
+        TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+
+        return brokerConfigMap;
+    }
 
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 5ba6cfa..158f03f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -161,6 +161,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 }
             });
 
+
         Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.SO_KEEPALIVE, false)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 7030056..3ae0f8c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.tools.admin;
 
 import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -28,6 +29,9 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.AclConfig;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.RollbackStats;
@@ -221,6 +225,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQAdminExtImpl.getTopicConfigMap(clientMetadata, topic);
+    }
+
+    @Override
     public TopicStatsTable examineTopicStats(
         String topic) throws RemotingException, MQClientException, InterruptedException,
         MQBrokerException {
@@ -665,6 +674,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
     }
 
+
     @Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
         LogicalQueueRouteData fromQueueRouteData,
         LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 06b456f..31be976 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.tools.admin;
 
 import java.io.UnsupportedEncodingException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,6 +30,9 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -42,6 +46,9 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -82,6 +89,9 @@ import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -1101,6 +1111,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
+    public Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        TopicRouteData routeData = examineTopicRouteInfo(topic);
+        clientMetadata.freshTopicRoute(topic, routeData);
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
+
+        if (routeData != null
+                && !routeData.getQueueDatas().isEmpty()) {
+            for (QueueData queueData: routeData.getQueueDatas()) {
+                String bname = queueData.getBrokerName();
+                String addr = clientMetadata.findMasterBrokerAddr(bname);
+                TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
+                //allow the config is null
+                if (mapping != null) {
+                    brokerConfigMap.put(bname, mapping);
+                }
+            }
+        }
+        return brokerConfigMap;
+    }
+
+    @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
     }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 888dad8..f94b90d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.AclConfig;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.RollbackStats;
@@ -340,6 +342,9 @@ public interface MQAdminExt extends MQAdmin {
     void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
             InterruptedException, MQClientException;
 
+    Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
     void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
         LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
 }