You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/23 09:22:05 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated (3a14786 -> 7f7cf1b)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a change to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from 3a14786  Fix checkstyle
     new 47622e6  Polish doc, add todo list
     new 61fcf43  Finish the topic stats and consume stats in admin client
     new 7f7cf1b  Add test for topicStats and consumeStats

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/processor/AdminBrokerProcessor.java     |  98 ++++--------------
 .../common/statictopic/TopicQueueMappingOne.java   |  46 +++++----
 .../common/statictopic/TopicQueueMappingUtils.java |   2 +-
 ..._Topic_Logic_Queue_\350\256\276\350\256\241.md" |  31 ++++++
 .../rocketmq/test/statictopic/StaticTopicIT.java   |  19 ++++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  92 +++++++++++------
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  | 111 +++++++++++++++++++++
 7 files changed, 267 insertions(+), 132 deletions(-)

[rocketmq] 03/03: Add test for topicStats and consumeStats

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 7f7cf1b7aa4ae15fc4c685f8201dac8d5e2be4f1
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 23 17:21:33 2021 +0800

    Add test for topicStats and consumeStats
---
 .../broker/processor/AdminBrokerProcessor.java     | 25 ++++++++++++++++------
 .../rocketmq/test/statictopic/StaticTopicIT.java   | 19 ++++++++++++++++
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  |  3 ++-
 3 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 61a8898..2891baf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1166,6 +1166,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 continue;
             }
 
+            TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
+
             {
                 SubscriptionData findSubscriptionData =
                     this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
@@ -1193,17 +1195,26 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                     requestHeader.getConsumerGroup(),
                     topic,
                     i);
-                if (consumerOffset < 0)
-                    consumerOffset = 0;
+                // the consumerOffset cannot be zero for static topic because of the "double read check" strategy
+                // just remain the logic for dynamic topic
+                // maybe we should remove it in the future
+                if (mappingDetail == null) {
+                    if (consumerOffset < 0)
+                        consumerOffset = 0;
+                }
 
                 offsetWrapper.setBrokerOffset(brokerOffset);
                 offsetWrapper.setConsumerOffset(consumerOffset);
 
-                long timeOffset = consumerOffset - 1;
-                if (timeOffset >= 0) {
-                    long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
-                    if (lastTimestamp > 0) {
-                        offsetWrapper.setLastTimestamp(lastTimestamp);
+                // the consumeOffset is not in this broker for static topic
+                // and may get the wrong result
+                if (mappingDetail == null) {
+                    long timeOffset = consumerOffset - 1;
+                    if (timeOffset >= 0) {
+                        long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
+                        if (lastTimestamp > 0) {
+                            offsetWrapper.setLastTimestamp(lastTimestamp);
+                        }
                     }
                 }
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index bc7cf25..22443d7 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -10,6 +10,7 @@ import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -306,6 +307,15 @@ public class StaticTopicIT extends BaseConf {
         //use a new producer
         producer = getProducer(nsAddr, topic);
 
+        ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(group);
+        List<MessageQueue> messageQueues = producer.getMessageQueue();
+        for (MessageQueue queue: messageQueues) {
+            OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue);
+            Assert.assertNotNull(wrapper);
+            Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset());
+            Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+        }
+
         List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
         for (int i = 0; i < brokers.size(); i++) {
             Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
@@ -314,6 +324,15 @@ public class StaticTopicIT extends BaseConf {
             Thread.sleep(500);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
         }
+        consumeStats = defaultMQAdminExt.examineConsumeStats(group);
+
+        messageQueues = producer.getMessageQueue();
+        for (MessageQueue queue: messageQueues) {
+            OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue);
+            Assert.assertNotNull(wrapper);
+            Assert.assertEquals(msgEachQueue + brokers.size() * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset());
+            Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+        }
         consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
         consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
     }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index cd2c4ac..d915cb1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -315,6 +315,7 @@ public class MQAdminUtils {
                 if (phyOffsetWrapper == null) {
                     continue;
                 }
+
                 if (consumerOffset == -1
                     && phyOffsetWrapper.getConsumerOffset() >= 0) {
                     consumerOffset = phyOffsetWrapper.getConsumerOffset();
@@ -322,7 +323,7 @@ public class MQAdminUtils {
                 }
                 if (brokerOffset == -1
                     && item.getLogicOffset() >= 0) {
-                    brokerOffset = item.computeStaticQueueOffsetStrictly(offsetWrapper.getBrokerOffset());
+                    brokerOffset = item.computeStaticQueueOffsetStrictly(phyOffsetWrapper.getBrokerOffset());
                 }
                 if (consumerOffset >= 0
                     && brokerOffset >= 0) {

[rocketmq] 02/03: Finish the topic stats and consume stats in admin client

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 61fcf438b12699bfb2dbb8e77fba293ff63b075c
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 23 16:48:56 2021 +0800

    Finish the topic stats and consume stats in admin client
---
 .../broker/processor/AdminBrokerProcessor.java     |  73 +-------------
 .../common/statictopic/TopicQueueMappingOne.java   |  46 +++++----
 .../common/statictopic/TopicQueueMappingUtils.java |   4 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  92 ++++++++++-------
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  | 110 +++++++++++++++++++++
 5 files changed, 199 insertions(+), 126 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0d2a59b..61a8898 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1019,69 +1019,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
-    private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
-        try {
-            assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO;
-            if (mappingContext.getMappingDetail() == null) {
-                return null;
-            }
-            final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader();
-            String topic = requestHeader.getTopic();
-            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
-            Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>();
-            Set<String> brokers = new HashSet<>();
-            mappingDetail.getHostedQueues().forEach((qid, items) -> {
-                if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
-                    LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2];
-                    itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true);
-                    itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true);
-                    assert itemPair[0] != null && itemPair[1] != null;
-                    qidItemMap.put(qid, itemPair);
-                    brokers.add(itemPair[0].getBname());
-                    brokers.add(itemPair[1].getBname());
-                }
-            });
-            Map<String, TopicStatsTable> statsTable = new HashMap<>();
-            for (String broker: brokers) {
-                GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
-                header.setTopic(topic);
-                header.setBname(broker);
-                header.setLo(false);
-                RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
-                RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
-                if (rpcResponse.getException() != null) {
-                    throw rpcResponse.getException();
-                }
-                statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
-            }
-            TopicStatsTable topicStatsTable = new TopicStatsTable();
-            qidItemMap.forEach((qid, itemPair) -> {
-                LogicQueueMappingItem minItem = itemPair[0];
-                LogicQueueMappingItem maxItem = itemPair[1];
-                TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
-                TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId()));
-
-                assert  minTopicOffset != null && maxTopicOffset != null;
-
-                long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
-                if (min < 0)
-                    min = 0;
-                long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
-                if (max < 0)
-                    max = 0;
-                long timestamp = maxTopicOffset.getLastUpdateTimestamp();
-
-                TopicOffset topicOffset = new TopicOffset();
-                topicOffset.setMinOffset(min);
-                topicOffset.setMaxOffset(max);
-                topicOffset.setLastUpdateTimestamp(timestamp);
-                topicStatsTable.getOffsetTable().put(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingDetail.getScope()), qid), topicOffset);
-            });
-            return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable);
-        } catch (Throwable t) {
-            return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t));
-        }
-    }
+
 
     private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
@@ -1097,15 +1035,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             return response;
         }
         TopicStatsTable topicStatsTable = new TopicStatsTable();
-        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
-        RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext);
-        if (rpcResponse != null) {
-            if (rpcResponse.getException() != null) {
-                return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
-            } else {
-                topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable());
-            }
-        }
 
         for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
             MessageQueue mq = new MessageQueue();
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
index 319e113..636f1d5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
@@ -28,8 +28,10 @@ public class TopicQueueMappingOne extends RemotingSerializable {
     String bname;  //identify the hosted broker name
     Integer globalId;
     List<LogicQueueMappingItem> items;
+    TopicQueueMappingDetail mappingDetail;
 
-    public TopicQueueMappingOne(String topic, String bname, Integer globalId, List<LogicQueueMappingItem> items) {
+    public TopicQueueMappingOne(TopicQueueMappingDetail mappingDetail, String topic, String bname, Integer globalId, List<LogicQueueMappingItem> items) {
+        this.mappingDetail =  mappingDetail;
         this.topic = topic;
         this.bname = bname;
         this.globalId = globalId;
@@ -52,29 +54,35 @@ public class TopicQueueMappingOne extends RemotingSerializable {
         return items;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
+    public TopicQueueMappingDetail getMappingDetail() {
+        return mappingDetail;
+    }
 
-        if (!(o instanceof TopicQueueMappingOne)) return false;
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (!(o instanceof TopicQueueMappingOne))
+            return false;
 
         TopicQueueMappingOne that = (TopicQueueMappingOne) o;
 
-        return new EqualsBuilder()
-                .append(topic, that.topic)
-                .append(bname, that.bname)
-                .append(globalId, that.globalId)
-                .append(items, that.items)
-                .isEquals();
+        if (topic != null ? !topic.equals(that.topic) : that.topic != null)
+            return false;
+        if (bname != null ? !bname.equals(that.bname) : that.bname != null)
+            return false;
+        if (globalId != null ? !globalId.equals(that.globalId) : that.globalId != null)
+            return false;
+        if (items != null ? !items.equals(that.items) : that.items != null)
+            return false;
+        return mappingDetail != null ? mappingDetail.equals(that.mappingDetail) : that.mappingDetail == null;
     }
 
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder(17, 37)
-                .append(topic)
-                .append(bname)
-                .append(globalId)
-                .append(items)
-                .toHashCode();
+    @Override public int hashCode() {
+        int result = topic != null ? topic.hashCode() : 0;
+        result = 31 * result + (bname != null ? bname.hashCode() : 0);
+        result = 31 * result + (globalId != null ? globalId.hashCode() : 0);
+        result = 31 * result + (items != null ? items.hashCode() : 0);
+        result = 31 * result + (mappingDetail != null ? mappingDetail.hashCode() : 0);
+        return result;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 6a1c39c..bf02ccd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -371,7 +371,7 @@ public class TopicQueueMappingUtils {
                         throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
                     }
                 } else {
-                    globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue()));
+                    globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail, mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue()));
                 }
             }
         }
@@ -384,8 +384,8 @@ public class TopicQueueMappingUtils {
                     throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
                 }
             }
-            checkIfReusePhysicalQueue(globalIdMap.values());
         }
+        checkIfReusePhysicalQueue(globalIdMap.values());
         return globalIdMap;
     }
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index fce4318..b7e4816 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -16,6 +16,19 @@
  */
 package org.apache.rocketmq.tools.admin;
 
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -64,7 +77,11 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -78,30 +95,11 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.apache.rocketmq.tools.admin.api.TrackType;
 
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
+import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems;
+import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
 
 public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
-    private final InternalLogger log = ClientLogger.getLog();
-    private final DefaultMQAdminExt defaultMQAdminExt;
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
-    private MQClientInstance mqClientInstance;
-    private RPCHook rpcHook;
-    private long timeoutMillis = 20000;
-    private Random random = new Random();
-
     private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
 
     static {
@@ -120,6 +118,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
     }
 
+    private final InternalLogger log = ClientLogger.getLog();
+    private final DefaultMQAdminExt defaultMQAdminExt;
+    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private MQClientInstance mqClientInstance;
+    private RPCHook rpcHook;
+    private long timeoutMillis = 20000;
+    private Random random = new Random();
+
     public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
         this(defaultMQAdminExt, null, timeoutMillis);
     }
@@ -245,7 +251,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+    public TopicConfig examineTopicConfig(String addr,
+        String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
     }
 
@@ -264,6 +271,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             }
         }
 
+        //Get the static stats
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(topic, topicRouteData, defaultMQAdminExt);
+        MQAdminUtils.convertPhysicalTopicStats(topic, brokerConfigMap, topicStatsTable);
 
         if (topicStatsTable.getOffsetTable().isEmpty()) {
             throw new MQClientException("Not found the topic stats info", null);
@@ -272,10 +282,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return topicStatsTable;
     }
 
-
     @Override
-    public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
-            MQBrokerException {
+    public TopicStatsTable examineTopicStats(String brokerAddr,
+        String topic) throws RemotingException, MQClientException, InterruptedException,
+        MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis);
     }
 
@@ -323,12 +333,29 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             }
         }
 
-        if (result.getOffsetTable().isEmpty()) {
+        Set<String> topics = new HashSet<>();
+        for (MessageQueue messageQueue: result.getOffsetTable().keySet()) {
+            topics.add(messageQueue.getTopic());
+        }
+
+        ConsumeStats staticResult = new ConsumeStats();
+        staticResult.setConsumeTps(result.getConsumeTps());
+        // for topic, we put the physical stats, how about group?
+        // staticResult.getOffsetTable().putAll(result.getOffsetTable());
+
+        for (String currentTopic: topics) {
+            TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic);
+            Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt);
+            ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
+            staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
+        }
+
+        if (staticResult.getOffsetTable().isEmpty()) {
             throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
                 "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
         }
 
-        return result;
+        return staticResult;
     }
 
     @Override
@@ -355,7 +382,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
     }
 
-
     @Override
     public ConsumerConnection examineConsumerConnectionInfo(
         String consumerGroup) throws InterruptedException, MQBrokerException,
@@ -416,7 +442,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
     @Override
     public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException,
-            RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+        RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
         return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
     }
 
@@ -1047,11 +1073,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException,  InterruptedException, MQBrokerException {
+    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
+        final TopicQueueMappingDetail mappingDetail,
+        final boolean force) throws RemotingException, InterruptedException, MQBrokerException {
         this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
     }
 
-
     @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
@@ -1062,7 +1089,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
     }
 
-
     @Override
     public long minOffset(MessageQueue mq) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
@@ -1150,7 +1176,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
     @Override
     public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup, final
-        MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
+    MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index a5aab4d..cd2c4ac 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -19,16 +19,20 @@ package org.apache.rocketmq.tools.admin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -42,6 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems;
+import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
+
 public class MQAdminUtils {
 
 
@@ -229,4 +236,107 @@ public class MQAdminUtils {
         }
         return brokerConfigMap;
     }
+
+
+    public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigFromRoute(String topic, TopicRouteData topicRouteData, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException,  InterruptedException, MQBrokerException {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String broker = bd.getBrokerName();
+            String addr = bd.selectBrokerAddr();
+            if (addr == null) {
+                continue;
+            }
+            try {
+                TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                //allow the config is null
+                if (mapping != null) {
+                    if (mapping.getMappingDetail() != null) {
+                        assert mapping.getMappingDetail().getBname().equals(broker);
+                    }
+                    brokerConfigMap.put(broker, mapping);
+                }
+            } catch (MQBrokerException exception) {
+                if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                    throw exception;
+                }
+            }
+        }
+        return brokerConfigMap;
+    }
+
+    public static void convertPhysicalTopicStats(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, TopicStatsTable topicStatsTable) {
+        Map<Integer, TopicQueueMappingOne> globalIdMap = checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), true, false);
+        for (Map.Entry<Integer, TopicQueueMappingOne> entry: globalIdMap.entrySet()) {
+            Integer qid = entry.getKey();
+            TopicQueueMappingOne mappingOne =  entry.getValue();
+            LogicQueueMappingItem minItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingOne.getItems(), 0, true);
+            LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingOne.getItems(), Long.MAX_VALUE, true);
+            assert  minItem != null && maxItem != null;
+            TopicOffset minTopicOffset = topicStatsTable.getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
+            TopicOffset maxTopicOffset = topicStatsTable.getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId()));
+
+            if (minTopicOffset == null
+                || maxTopicOffset == null) {
+                continue;
+            }
+            long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
+            if (min < 0)
+                min = 0;
+            long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
+            if (max < 0)
+                max = 0;
+            long timestamp = maxTopicOffset.getLastUpdateTimestamp();
+
+            TopicOffset topicOffset = new TopicOffset();
+            topicOffset.setMinOffset(min);
+            topicOffset.setMaxOffset(max);
+            topicOffset.setLastUpdateTimestamp(timestamp);
+            topicStatsTable.getOffsetTable().put(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope()), qid), topicOffset);
+        }
+    }
+
+
+    public static ConsumeStats convertPhysicalConsumeStats(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ConsumeStats physicalResult) {
+        Map<Integer, TopicQueueMappingOne> globalIdMap = checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), true, false);
+        ConsumeStats result = new ConsumeStats();
+        result.setConsumeTps(physicalResult.getConsumeTps());
+        for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
+            Integer qid = entry.getKey();
+            TopicQueueMappingOne mappingOne = entry.getValue();
+            MessageQueue messageQueue = new MessageQueue(mappingOne.getTopic(), TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope()), qid);
+            OffsetWrapper offsetWrapper = new OffsetWrapper();
+            long brokerOffset = -1;
+            long consumerOffset = -1;
+            long lastTimestamp = -1; //maybe need to be polished
+            for (int i = mappingOne.getItems().size() - 1; i >= 0; i--) {
+                LogicQueueMappingItem item = mappingOne.getItems().get(i);
+                MessageQueue phyQueue = new MessageQueue(mappingOne.getTopic(), item.getBname(), item.getQueueId());
+                OffsetWrapper phyOffsetWrapper = physicalResult.getOffsetTable().get(phyQueue);
+                if (phyOffsetWrapper == null) {
+                    continue;
+                }
+                if (consumerOffset == -1
+                    && phyOffsetWrapper.getConsumerOffset() >= 0) {
+                    consumerOffset = phyOffsetWrapper.getConsumerOffset();
+                    lastTimestamp = phyOffsetWrapper.getLastTimestamp();
+                }
+                if (brokerOffset == -1
+                    && item.getLogicOffset() >= 0) {
+                    brokerOffset = item.computeStaticQueueOffsetStrictly(offsetWrapper.getBrokerOffset());
+                }
+                if (consumerOffset >= 0
+                    && brokerOffset >= 0) {
+                    break;
+                }
+            }
+            if (brokerOffset >= 0
+                && consumerOffset >= 0) {
+                offsetWrapper.setBrokerOffset(brokerOffset);
+                offsetWrapper.setConsumerOffset(consumerOffset);
+                offsetWrapper.setLastTimestamp(lastTimestamp);
+                result.getOffsetTable().put(messageQueue, offsetWrapper);
+            }
+        }
+        return result;
+    }
 }

[rocketmq] 01/03: Polish doc, add todo list

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 47622e6ad56c1811e835dd439eb81d653ea221a3
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 23 15:40:01 2021 +0800

    Polish doc, add todo list
---
 .../common/statictopic/TopicQueueMappingUtils.java |  2 +-
 ..._Topic_Logic_Queue_\350\256\276\350\256\241.md" | 31 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 75f0a4f..6a1c39c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -384,8 +384,8 @@ public class TopicQueueMappingUtils {
                     throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
                 }
             }
+            checkIfReusePhysicalQueue(globalIdMap.values());
         }
-        checkIfReusePhysicalQueue(globalIdMap.values());
         return globalIdMap;
     }
 
diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
index bf59688..c06d83f 100644
--- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
+++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
@@ -6,6 +6,8 @@
 | 2021-12-01 | 更新问题与风险,增加关于一致性、OutOfRange、拉取中断的详细说明| dongforever |
 | 2021-12-03 | 增加代码走读的说明| dongforever |
 | 2021-12-10 | 引入Scope概念,保留『多集群动态零耦合』的集群设计模型 | dongforever |
+| 2021-12-23 | 梳理待完成事项;讨论Admin接口的适配方式 | dongforever |
+
 
 
 中文文档在描述特定专业术语时,仍然使用英文。
@@ -424,6 +426,15 @@ Command操作时,提前预判Master是否存在,如果不存在,则提前
 #### pullResult 位点由谁设置的问题
 类似于Batch,由客户端设置,避免服务端解开消息:  
 在PullResultExt中新增字段 offsetDelta。
+#### Admin接口与User接口的适配方式区别
+User 接口,使用范围广泛如多语言等,应该尽可能简单,把适配逻辑做在服务端,对『客户端』透明。  
+那么 Admin 接口呢,比如 examineTopicStats,适配逻辑是做在『服务端』还是『客户端』?  
+一个 Admin 接口,通常需要访问所有 Broker 的所有队列。
+如果做在服务端,则可能产生交叉访问,在极端情况下,性能会非常差,举个例子:  
+100 个 Broker,相互交叉映射过一遍,则Admin客户端首先要向 100 个 Broker 发请求,然后这 100 个 Broker 为了获取远程信息,各自向其余 Broker 发请求。
+其实际网络请求数就是 100 * 100 = 10000 个网络请求。放大效应十分明显。  
+同时, 考虑到 Admin 接口,使用范围是有限的,无需考虑多语言适配等问题,可以把适配逻辑做在 Admin 客户端。
+
 #### 远程读的性能问题
 从实战经验来看,性能损耗几乎不计。
 #### 使用习惯的改变
@@ -437,6 +448,26 @@ Command操作时,提前预判Master是否存在,如果不存在,则提前
 二阶消息需要支持远程读写操作。
 一期的LogicQueue不支持『二阶消息』。
 
+### 待完成事项
+#### 阻止旧客户端的请求
+旧的客户端无法解析逻辑路由,但可以识别物理路由。如果错误使用,则会影响映射关系的准确性。
+#### 阻止Pop模式、事务消息、定时消息使用 LogicQueue
+不兼容 事务消息和定时消息。  
+LogicQueue 当前不支持Pop模式消费。
+#### ConsumeQueue 的 correctMinOffset 逻辑存在缺陷
+可能导致 LogicQueue 无法清除已经过期的 MappingItem。
+#### getOffsetInQueueByTime 语义有缺陷
+可能导致 LogicQueue 时间搜索不准确。需要专项修复。
+#### Metadata 更新机制
+当前的更新机制太慢。且访问『不存在Broker』时,会频繁访问Nameserver,有打爆Nameserver的风险。
+#### examineConsumeStats 接口获取不到『最近消费时间』
+位点相关的消息可能不在本机,需要远程访问。
+#### resetOffset 需要适配
+当前没有适配。重置位点,可能会得到不符合预期的结果。
+#### MessageQueue 没有被物理清除
+当前只是产生遗留数据,占用一点点存储空间,没有太大影响。  
+如果将来要实现 物理 Queue 复用,则需要先完善相关逻辑。
+
 ### 代码走读要点
 #### Admin 入口
 主要看两个类: