You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/10 12:48:49 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish and add some test

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new f5285b0  Polish and add some test
f5285b0 is described below

commit f5285b008ada78413123ae6da1ec8d3b506f0f62
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 10 20:48:39 2021 +0800

    Polish and add some test
---
 .../broker/processor/PullMessageProcessor.java     | 24 +++++-------
 .../broker/processor/SendMessageProcessor.java     |  2 +-
 .../rocketmq/common/LogicQueueMappingItem.java     | 40 ++++++++------------
 .../rocketmq/common/TopicQueueMappingDetail.java   | 43 +++++-----------------
 .../rocketmq/common/TopicQueueMappingInfo.java     |  4 +-
 .../rocketmq/common/TopicQueueMappingTest.java     | 42 +++++++++++++++++++++
 .../org/apache/rocketmq/common/UtilAllTest.java    |  3 ++
 7 files changed, 84 insertions(+), 74 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 5ab3c01..8d7758a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -36,9 +36,7 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
-import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.common.LogicQueueMappingItem;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -56,8 +54,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
@@ -126,7 +122,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
         Integer globalId = requestHeader.getQueueId();
         Long globalOffset = requestHeader.getQueueOffset();
 
-        LogicQueueMappingItem mappingItem = mappingDetail.getLogicQueueMappingItem(globalId, globalOffset);
+        LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset);
         return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem);
     }
 
@@ -153,10 +149,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             //below are physical info
             String bname = mappingItem.getBname();
             Integer phyQueueId = mappingItem.getQueueId();
-            Long phyQueueOffset = mappingItem.convertToPhysicalQueueOffset(globalOffset);
+            Long phyQueueOffset = mappingItem.computePhysicalQueueOffset(globalOffset);
             requestHeader.setQueueId(phyQueueId);
             requestHeader.setQueueOffset(phyQueueOffset);
-            if (mappingItem.isEndOffsetDecided()
+            if (mappingItem.checkIfEndOffsetDecided()
                     && requestHeader.getMaxMsgNums() != null) {
                 requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
             }
@@ -205,24 +201,24 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                 long nextBeginOffset = responseHeader.getNextBeginOffset();
                 assert nextBeginOffset >= requestHeader.getQueueOffset();
                 //the next begin offset should no more than the end offset
-                if (mappingItem.isEndOffsetDecided()
+                if (mappingItem.checkIfEndOffsetDecided()
                         && nextBeginOffset >= mappingItem.getEndOffset()) {
                     nextBeginOffset = mappingItem.getEndOffset();
                 }
-                responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset));
+                responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset));
             }
             //handle min offset
-            responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
+            responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
             //handle max offset
             {
-                if (mappingItem.isEndOffsetDecided()) {
-                    responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(), mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId())));
+                if (mappingItem.checkIfEndOffsetDecided()) {
+                    responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
                 } else {
-                    responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset()));
+                    responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset()));
                 }
             }
             //set the offsetDelta
-            responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta());
+            responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 62e2828..52508a4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -174,7 +174,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             }
             TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
 
-            long staticLogicOffset = mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
+            long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
             if (staticLogicOffset < 0) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
             }
diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 0a9ee96..8d1b164 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -8,42 +8,45 @@ public class LogicQueueMappingItem {
     private long logicOffset; // the start of the logic offset
     private long startOffset; // the start of the physical offset
     private long endOffset; // the end of the physical offset
-    private long timeOfStart = -1; //mutable
+    private long timeOfStart = -1; // mutable
+    private long timeOfEnd = -1; // mutable
 
-    public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) {
+    public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
         this.gen = gen;
         this.queueId = queueId;
         this.bname = bname;
         this.logicOffset = logicOffset;
         this.startOffset = startOffset;
+        this.endOffset = endOffset;
         this.timeOfStart = timeOfStart;
+        this.timeOfEnd = timeOfEnd;
     }
 
-    public long convertToStaticQueueOffset(long physicalQueueOffset) {
+    public long computeStaticQueueOffset(long physicalQueueOffset) {
         return  logicOffset + (physicalQueueOffset - startOffset);
     }
 
-    public long convertToPhysicalQueueOffset(long staticQueueOffset) {
+    public long computePhysicalQueueOffset(long staticQueueOffset) {
         return  (staticQueueOffset - logicOffset) + startOffset;
     }
 
-    public long convertToMaxStaticQueueOffset() {
+    public long computeMaxStaticQueueOffset() {
         if (endOffset >= startOffset) {
             return logicOffset + endOffset - startOffset;
         } else {
             return logicOffset;
         }
     }
-    public boolean isShouldDeleted() {
+    public boolean checkIfShouldDeleted() {
         return endOffset == startOffset;
     }
 
-    public boolean isEndOffsetDecided() {
+    public boolean checkIfEndOffsetDecided() {
         //if the endOffset == startOffset, then the item should be deleted
         return endOffset > startOffset;
     }
 
-    public long convertOffsetDelta() {
+    public long computeOffsetDelta() {
         return logicOffset - startOffset;
     }
 
@@ -51,20 +54,6 @@ public class LogicQueueMappingItem {
         return gen;
     }
 
-    public void setGen(int gen) {
-        this.gen = gen;
-    }
-
-
-    public long getTimeOfStart() {
-        return timeOfStart;
-    }
-
-    public void setTimeOfStart(long timeOfStart) {
-        this.timeOfStart = timeOfStart;
-    }
-
-
     public int getQueueId() {
         return queueId;
     }
@@ -85,8 +74,11 @@ public class LogicQueueMappingItem {
         return endOffset;
     }
 
+    public long getTimeOfStart() {
+        return timeOfStart;
+    }
 
-    public void setEndOffset(long endOffset) {
-        this.endOffset = endOffset;
+    public long getTimeOfEnd() {
+        return timeOfEnd;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 8c2aad9..8d5c8e8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
 
     // the mapping info in current broker, do not register to nameserver
-    ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+    private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
 
     public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
         super(topic, totalQueues, bname);
@@ -47,16 +47,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         this.prevIdMap = buildIdMap(LEVEL_1);
     }
 
-    public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, Integer> original) {
-        if (original == null || original.isEmpty()) {
-            return new ConcurrentHashMap<Integer, Integer>();
-        }
-        ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
-        for (Map.Entry<Integer, Integer> entry: tmpIdMap.entrySet()) {
-            tmpIdMap.put(entry.getValue(), entry.getKey());
-        }
-        return tmpIdMap;
-    }
 
     public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
         //level 0 means current leader in this broker
@@ -92,24 +82,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     }
 
 
-    public long convertToLogicOffset(Integer globalId, long physicalLogicOffset) {
+    public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) {
         List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
         if (mappingItems == null
                 || mappingItems.isEmpty()) {
             return -1;
         }
         if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
-            return mappingItems.get(mappingItems.size() - 1).convertToStaticQueueOffset(physicalLogicOffset);
+            return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset);
         }
         //Consider the "switch" process, reduce the error
         if (mappingItems.size() >= 2
             && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
-            return mappingItems.get(mappingItems.size() - 2).convertToStaticQueueOffset(physicalLogicOffset);
+            return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset);
         }
         return -1;
     }
 
-    public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId, long logicOffset) {
+    public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) {
         List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
         if (mappingItems == null
                 || mappingItems.isEmpty()) {
@@ -124,21 +114,21 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         }
         //if not found, maybe out of range, return the first one
         for (int i = 0; i < mappingItems.size(); i++) {
-            if (!mappingItems.get(i).isShouldDeleted()) {
+            if (!mappingItems.get(i).checkIfShouldDeleted()) {
                 return mappingItems.get(i);
             }
         }
         return null;
     }
 
-    public long getMaxOffsetFromMapping(Integer globalId) {
+    public long computeMaxOffsetFromMapping(Integer globalId) {
         List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
         if (mappingItems == null
                 || mappingItems.isEmpty()) {
             return -1;
         }
         LogicQueueMappingItem item =  mappingItems.get(mappingItems.size() - 1);
-        return item.convertToMaxStaticQueueOffset();
+        return item.computeMaxStaticQueueOffset();
     }
 
 
@@ -150,20 +140,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         return topicQueueMappingInfo;
     }
 
-
-    public int getTotalQueues() {
-        return totalQueues;
-    }
-
-    public void setTotalQueues(int totalQueues) {
-        this.totalQueues = totalQueues;
-    }
-
-    public String getBname() {
-        return bname;
-    }
-
-    public String getTopic() {
-        return topic;
+    public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
+        return hostedQueues;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 2d76365..b4a92f3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -29,9 +29,9 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     int totalQueues;
     String bname;  //identify the hosted broker name
     //register to broker to construct the route
-    ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+    transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
     //register to broker to help detect remapping failure
-    ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
+    transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
 
     public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
         this.topic = topic;
diff --git a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
new file mode 100644
index 0000000..8e125ea
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
@@ -0,0 +1,42 @@
+package org.apache.rocketmq.common;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TopicQueueMappingTest {
+
+    @Test
+    public void testJsonSerialize() {
+        LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
+        String mappingItemJson = JSON.toJSONString(mappingItem) ;
+        System.out.println(mappingItemJson);
+
+        Map<String, Object> mappingItemMap = JSON.parseObject(mappingItemJson, Map.class);
+        Assert.assertEquals(8, mappingItemMap.size());
+        Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
+        Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen());
+        Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset());
+        Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId());
+        Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset());
+        Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
+        Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
+        Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
+
+        TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01");
+        mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
+
+        String mappingDetailJson = JSON.toJSONString(mappingDetail);
+        Map  mappingDetailMap = JSON.parseObject(mappingDetailJson);
+        Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
+        Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
+        Assert.assertEquals(4, mappingDetailMap.size());
+        Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
+        Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 32d2ba6..7ef7eac 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -23,6 +23,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;