You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/10 12:48:49 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish and add
some test
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new f5285b0 Polish and add some test
f5285b0 is described below
commit f5285b008ada78413123ae6da1ec8d3b506f0f62
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 10 20:48:39 2021 +0800
Polish and add some test
---
.../broker/processor/PullMessageProcessor.java | 24 +++++-------
.../broker/processor/SendMessageProcessor.java | 2 +-
.../rocketmq/common/LogicQueueMappingItem.java | 40 ++++++++------------
.../rocketmq/common/TopicQueueMappingDetail.java | 43 +++++-----------------
.../rocketmq/common/TopicQueueMappingInfo.java | 4 +-
.../rocketmq/common/TopicQueueMappingTest.java | 42 +++++++++++++++++++++
.../org/apache/rocketmq/common/UtilAllTest.java | 3 ++
7 files changed, 84 insertions(+), 74 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 5ab3c01..8d7758a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -36,9 +36,7 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
-import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -56,8 +54,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
@@ -126,7 +122,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
Integer globalId = requestHeader.getQueueId();
Long globalOffset = requestHeader.getQueueOffset();
- LogicQueueMappingItem mappingItem = mappingDetail.getLogicQueueMappingItem(globalId, globalOffset);
+ LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset);
return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem);
}
@@ -153,10 +149,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//below are physical info
String bname = mappingItem.getBname();
Integer phyQueueId = mappingItem.getQueueId();
- Long phyQueueOffset = mappingItem.convertToPhysicalQueueOffset(globalOffset);
+ Long phyQueueOffset = mappingItem.computePhysicalQueueOffset(globalOffset);
requestHeader.setQueueId(phyQueueId);
requestHeader.setQueueOffset(phyQueueOffset);
- if (mappingItem.isEndOffsetDecided()
+ if (mappingItem.checkIfEndOffsetDecided()
&& requestHeader.getMaxMsgNums() != null) {
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
}
@@ -205,24 +201,24 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
long nextBeginOffset = responseHeader.getNextBeginOffset();
assert nextBeginOffset >= requestHeader.getQueueOffset();
//the next begin offset should no more than the end offset
- if (mappingItem.isEndOffsetDecided()
+ if (mappingItem.checkIfEndOffsetDecided()
&& nextBeginOffset >= mappingItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset();
}
- responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset));
+ responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset));
}
//handle min offset
- responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
+ responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
//handle max offset
{
- if (mappingItem.isEndOffsetDecided()) {
- responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(), mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId())));
+ if (mappingItem.checkIfEndOffsetDecided()) {
+ responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
} else {
- responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset()));
+ responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset()));
}
}
//set the offsetDelta
- responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta());
+ responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 62e2828..52508a4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -174,7 +174,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
- long staticLogicOffset = mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
+ long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 0a9ee96..8d1b164 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -8,42 +8,45 @@ public class LogicQueueMappingItem {
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset
private long endOffset; // the end of the physical offset
- private long timeOfStart = -1; //mutable
+ private long timeOfStart = -1; // mutable
+ private long timeOfEnd = -1; // mutable
- public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) {
+ public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
this.gen = gen;
this.queueId = queueId;
this.bname = bname;
this.logicOffset = logicOffset;
this.startOffset = startOffset;
+ this.endOffset = endOffset;
this.timeOfStart = timeOfStart;
+ this.timeOfEnd = timeOfEnd;
}
- public long convertToStaticQueueOffset(long physicalQueueOffset) {
+ public long computeStaticQueueOffset(long physicalQueueOffset) {
return logicOffset + (physicalQueueOffset - startOffset);
}
- public long convertToPhysicalQueueOffset(long staticQueueOffset) {
+ public long computePhysicalQueueOffset(long staticQueueOffset) {
return (staticQueueOffset - logicOffset) + startOffset;
}
- public long convertToMaxStaticQueueOffset() {
+ public long computeMaxStaticQueueOffset() {
if (endOffset >= startOffset) {
return logicOffset + endOffset - startOffset;
} else {
return logicOffset;
}
}
- public boolean isShouldDeleted() {
+ public boolean checkIfShouldDeleted() {
return endOffset == startOffset;
}
- public boolean isEndOffsetDecided() {
+ public boolean checkIfEndOffsetDecided() {
//if the endOffset == startOffset, then the item should be deleted
return endOffset > startOffset;
}
- public long convertOffsetDelta() {
+ public long computeOffsetDelta() {
return logicOffset - startOffset;
}
@@ -51,20 +54,6 @@ public class LogicQueueMappingItem {
return gen;
}
- public void setGen(int gen) {
- this.gen = gen;
- }
-
-
- public long getTimeOfStart() {
- return timeOfStart;
- }
-
- public void setTimeOfStart(long timeOfStart) {
- this.timeOfStart = timeOfStart;
- }
-
-
public int getQueueId() {
return queueId;
}
@@ -85,8 +74,11 @@ public class LogicQueueMappingItem {
return endOffset;
}
+ public long getTimeOfStart() {
+ return timeOfStart;
+ }
- public void setEndOffset(long endOffset) {
- this.endOffset = endOffset;
+ public long getTimeOfEnd() {
+ return timeOfEnd;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 8c2aad9..8d5c8e8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
- ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+ private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
@@ -47,16 +47,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
this.prevIdMap = buildIdMap(LEVEL_1);
}
- public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, Integer> original) {
- if (original == null || original.isEmpty()) {
- return new ConcurrentHashMap<Integer, Integer>();
- }
- ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
- for (Map.Entry<Integer, Integer> entry: tmpIdMap.entrySet()) {
- tmpIdMap.put(entry.getValue(), entry.getKey());
- }
- return tmpIdMap;
- }
public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
//level 0 means current leader in this broker
@@ -92,24 +82,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
}
- public long convertToLogicOffset(Integer globalId, long physicalLogicOffset) {
+ public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
return -1;
}
if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
- return mappingItems.get(mappingItems.size() - 1).convertToStaticQueueOffset(physicalLogicOffset);
+ return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset);
}
//Consider the "switch" process, reduce the error
if (mappingItems.size() >= 2
&& bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
- return mappingItems.get(mappingItems.size() - 2).convertToStaticQueueOffset(physicalLogicOffset);
+ return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset);
}
return -1;
}
- public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId, long logicOffset) {
+ public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
@@ -124,21 +114,21 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
}
//if not found, maybe out of range, return the first one
for (int i = 0; i < mappingItems.size(); i++) {
- if (!mappingItems.get(i).isShouldDeleted()) {
+ if (!mappingItems.get(i).checkIfShouldDeleted()) {
return mappingItems.get(i);
}
}
return null;
}
- public long getMaxOffsetFromMapping(Integer globalId) {
+ public long computeMaxOffsetFromMapping(Integer globalId) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
return -1;
}
LogicQueueMappingItem item = mappingItems.get(mappingItems.size() - 1);
- return item.convertToMaxStaticQueueOffset();
+ return item.computeMaxStaticQueueOffset();
}
@@ -150,20 +140,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return topicQueueMappingInfo;
}
-
- public int getTotalQueues() {
- return totalQueues;
- }
-
- public void setTotalQueues(int totalQueues) {
- this.totalQueues = totalQueues;
- }
-
- public String getBname() {
- return bname;
- }
-
- public String getTopic() {
- return topic;
+ public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
+ return hostedQueues;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 2d76365..b4a92f3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -29,9 +29,9 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
int totalQueues;
String bname; //identify the hosted broker name
//register to broker to construct the route
- ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+ transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
- ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
+ transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
this.topic = topic;
diff --git a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
new file mode 100644
index 0000000..8e125ea
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
@@ -0,0 +1,42 @@
+package org.apache.rocketmq.common;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TopicQueueMappingTest {
+
+ @Test
+ public void testJsonSerialize() {
+ LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
+ String mappingItemJson = JSON.toJSONString(mappingItem) ;
+ System.out.println(mappingItemJson);
+
+ Map<String, Object> mappingItemMap = JSON.parseObject(mappingItemJson, Map.class);
+ Assert.assertEquals(8, mappingItemMap.size());
+ Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
+ Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen());
+ Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset());
+ Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId());
+ Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset());
+ Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
+ Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
+ Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
+
+ TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01");
+ mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
+
+ String mappingDetailJson = JSON.toJSONString(mappingDetail);
+ Map mappingDetailMap = JSON.parseObject(mappingDetailJson);
+ Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
+ Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
+ Assert.assertEquals(4, mappingDetailMap.size());
+ Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
+ Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
+ }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 32d2ba6..7ef7eac 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -23,6 +23,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;