You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2022/06/22 09:52:35 UTC
[rocketmq] branch develop updated: [ISSUE#4468] Optimize broker buffer length initialization (#4469)
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new da01deb96 [ISSUE#4468] Optimize broker buffer length initialization (#4469)
da01deb96 is described below
commit da01deb961807c68b102aeccb1f06d9721ca700e
Author: Shengmin Wang <10...@users.noreply.github.com>
AuthorDate: Wed Jun 22 17:52:28 2022 +0800
[ISSUE#4468] Optimize broker buffer length initialization (#4469)
* optimize broker buffer length initialization
* delete constructor with MaxMessageSize parameter in commitLog
* add junit test testEncodeLongMessage()
* Modify unit test to remove the implementation of reflection
* [maven-release-plugin] prepare release rocketmq-all-4.9.4
* [maven-release-plugin] prepare for next development iteration
* add a blank line to do a new CI test
* add some illegal message's junit test cases
* modify batch message test, using assertThrows
Co-authored-by: hill007299 <hi...@126.com>
---
.../java/org/apache/rocketmq/store/CommitLog.java | 19 ++++++++---
.../apache/rocketmq/store/BatchPutMessageTest.java | 26 +++++++++++++++
.../rocketmq/store/DefaultMessageStoreTest.java | 39 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index ed0267b1b..8b8a86315 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1482,11 +1482,15 @@ public class CommitLog {
private final ByteBuf byteBuf;
// The maximum length of the message body.
private final int maxMessageBodySize;
-
+ // The maximum length of the full message.
+ private final int maxMessageSize;
MessageExtEncoder(final int maxMessageBodySize) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
- byteBuf = alloc.directBuffer(maxMessageBodySize);
+ //Reserve 64kb for encoding buffer outside body
+ int maxMessageSize = maxMessageBodySize + 64 * 1024;
+ byteBuf = alloc.directBuffer(maxMessageSize);
this.maxMessageBodySize = maxMessageBodySize;
+ this.maxMessageSize = maxMessageSize;
}
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
@@ -1511,13 +1515,20 @@ public class CommitLog {
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
- // Exceeds the maximum message
+ // Exceeds the maximum message body
if (bodyLength > this.maxMessageBodySize) {
CommitLog.log.warn("message body size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageBodySize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
+ // Exceeds the maximum message
+ if (msgLen > this.maxMessageSize) {
+ CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ + ", maxMessageSize: " + this.maxMessageSize);
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ }
+
// 1 TOTALSIZE
this.byteBuf.writeInt(msgLen);
// 2 MAGICCODE
@@ -1575,7 +1586,7 @@ public class CommitLog {
int totalLength = messagesByteBuff.limit();
if (totalLength > this.maxMessageBodySize) {
CommitLog.log.warn("message body size exceeded, msg body size: " + totalLength + ", maxMessageSize: " + this.maxMessageBodySize);
- throw new RuntimeException("message size exceeded");
+ throw new RuntimeException("message body size exceeded");
}
// properties from MessageExtBatch
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 1a61ff346..690ad0cbe 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -41,6 +41,7 @@ import java.util.Map;
import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class BatchPutMessageTest {
@@ -186,6 +187,31 @@ public class BatchPutMessageTest {
}
}
+ private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
+ keyBuilder.setLength(0);
+ keyBuilder.append(messageExt.getTopic());
+ keyBuilder.append('-');
+ keyBuilder.append(messageExt.getQueueId());
+ return keyBuilder.toString();
+ }
+
+ @Test
+ public void testPutLongBatchMessage() throws Exception{
+ String topic = "batch-long-topic";
+ MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
+ CommitLog commitLog = ((DefaultMessageStore) messageStore).getCommitLog();
+ CommitLog.PutMessageThreadLocal putMessageThreadLocal = commitLog.getPutMessageThreadLocal().get();
+
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
+ messageExtBatch.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 1]);
+ messageExtBatch.setTopic(topic);
+ CommitLog.PutMessageContext putMessageContext = new CommitLog.PutMessageContext(generateKey(
+ putMessageThreadLocal.getKeyBuilder(), messageExtBatch));
+ RuntimeException runtimeException = assertThrows(RuntimeException.class,
+ () -> putMessageThreadLocal.getEncoder().encode(messageExtBatch, putMessageContext));
+ assertThat("message body size exceeded").isEqualTo(runtimeException.getMessage());
+ }
+
private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index b565c5c66..0cabd18b0 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
import java.io.File;
import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
@@ -657,6 +658,44 @@ public class DefaultMessageStoreTest {
}
+ @Test
+ public void testPutLongMessage() throws Exception{
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+ CommitLog commitLog = ((DefaultMessageStore) messageStore).getCommitLog();
+ MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
+ CommitLog.PutMessageThreadLocal putMessageThreadLocal = commitLog.getPutMessageThreadLocal().get();
+
+ //body size, topic size, properties size exactly equal to max size
+ messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
+ messageExtBrokerInner.setTopic(new String(new byte[127]));
+ messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE]));
+ PutMessageResult encodeResult1 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+ assertTrue(encodeResult1 == null);
+
+ //body size exactly more than max message body size
+ messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 1]);
+ PutMessageResult encodeResult2 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+ assertTrue(encodeResult2.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+ //body size exactly equal to max message size
+ messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 64 * 1024]);
+ PutMessageResult encodeResult3 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+ assertTrue(encodeResult3.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+ //message properties length more than properties maxSize
+ messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
+ messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE+1]));
+ PutMessageResult encodeResult4 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+ assertTrue(encodeResult4.getPutMessageStatus() == PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+
+ //message length more than buffer length capacity
+ messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
+ messageExtBrokerInner.setTopic(new String(new byte[Short.MAX_VALUE]));
+ messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE]));
+ PutMessageResult encodeResult5 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+ assertTrue(encodeResult5.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+ }
+
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,