You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2022/06/22 09:52:35 UTC

[rocketmq] branch develop updated: [ISSUE#4468] Optimize broker buffer length initialization (#4469)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new da01deb96 [ISSUE#4468] Optimize broker buffer length initialization (#4469)
da01deb96 is described below

commit da01deb961807c68b102aeccb1f06d9721ca700e
Author: Shengmin Wang <10...@users.noreply.github.com>
AuthorDate: Wed Jun 22 17:52:28 2022 +0800

    [ISSUE#4468] Optimize broker buffer length initialization (#4469)
    
    * optimize broker buffer length initialization
    
    * delete constructor with MaxMessageSize parameter in commitLog
    
    * add junit test testEncodeLongMessage()
    
    * Modify unit test to remove the implementation of reflection
    
    * [maven-release-plugin] prepare release rocketmq-all-4.9.4
    
    * [maven-release-plugin] prepare for next development iteration
    
    * add a blank line to do a new CI test
    
    * add some illegal message's junit test cases
    
    * modify batch message test, using assertThrows
    
    Co-authored-by: hill007299 <hi...@126.com>
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 19 ++++++++---
 .../apache/rocketmq/store/BatchPutMessageTest.java | 26 +++++++++++++++
 .../rocketmq/store/DefaultMessageStoreTest.java    | 39 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index ed0267b1b..8b8a86315 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1482,11 +1482,15 @@ public class CommitLog {
         private final ByteBuf byteBuf;
         // The maximum length of the message body.
         private final int maxMessageBodySize;
-
+        // The maximum length of the full message.
+        private final int maxMessageSize;
         MessageExtEncoder(final int maxMessageBodySize) {
             ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
-            byteBuf = alloc.directBuffer(maxMessageBodySize);
+            //Reserve 64kb for encoding buffer outside body
+            int maxMessageSize = maxMessageBodySize + 64 * 1024;
+            byteBuf = alloc.directBuffer(maxMessageSize);
             this.maxMessageBodySize = maxMessageBodySize;
+            this.maxMessageSize = maxMessageSize;
         }
 
         protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
@@ -1511,13 +1515,20 @@ public class CommitLog {
 
             final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
 
-            // Exceeds the maximum message
+            // Exceeds the maximum message body
             if (bodyLength > this.maxMessageBodySize) {
                 CommitLog.log.warn("message body size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                     + ", maxMessageSize: " + this.maxMessageBodySize);
                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
             }
 
+            // Exceeds the maximum message
+            if (msgLen > this.maxMessageSize) {
+                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+                        + ", maxMessageSize: " + this.maxMessageSize);
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+            }
+
             // 1 TOTALSIZE
             this.byteBuf.writeInt(msgLen);
             // 2 MAGICCODE
@@ -1575,7 +1586,7 @@ public class CommitLog {
             int totalLength = messagesByteBuff.limit();
             if (totalLength > this.maxMessageBodySize) {
                 CommitLog.log.warn("message body size exceeded, msg body size: " + totalLength + ", maxMessageSize: " + this.maxMessageBodySize);
-                throw new RuntimeException("message size exceeded");
+                throw new RuntimeException("message body size exceeded");
             }
 
             // properties from MessageExtBatch
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 1a61ff346..690ad0cbe 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -41,6 +41,7 @@ import java.util.Map;
 
 import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class BatchPutMessageTest {
@@ -186,6 +187,31 @@ public class BatchPutMessageTest {
         }
 
     }
+    private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
+        keyBuilder.setLength(0);
+        keyBuilder.append(messageExt.getTopic());
+        keyBuilder.append('-');
+        keyBuilder.append(messageExt.getQueueId());
+        return keyBuilder.toString();
+    }
+
+    @Test
+    public void testPutLongBatchMessage() throws Exception{
+        String topic = "batch-long-topic";
+        MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
+        CommitLog commitLog = ((DefaultMessageStore) messageStore).getCommitLog();
+        CommitLog.PutMessageThreadLocal putMessageThreadLocal = commitLog.getPutMessageThreadLocal().get();
+
+        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        messageExtBatch.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 1]);
+        messageExtBatch.setTopic(topic);
+        CommitLog.PutMessageContext putMessageContext = new CommitLog.PutMessageContext(generateKey(
+                putMessageThreadLocal.getKeyBuilder(), messageExtBatch));
+        RuntimeException runtimeException = assertThrows(RuntimeException.class,
+                () -> putMessageThreadLocal.getEncoder().encode(messageExtBatch, putMessageContext));
+        assertThat("message body size exceeded").isEqualTo(runtimeException.getMessage());
+    }
+
 
     private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
         final int msgLen = 4 //TOTALSIZE
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index b565c5c66..0cabd18b0 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
 
 import java.io.File;
 import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
@@ -657,6 +658,44 @@ public class DefaultMessageStoreTest {
 
     }
 
+    @Test
+    public void testPutLongMessage() throws Exception{
+        MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+        CommitLog commitLog = ((DefaultMessageStore) messageStore).getCommitLog();
+        MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
+        CommitLog.PutMessageThreadLocal putMessageThreadLocal = commitLog.getPutMessageThreadLocal().get();
+
+        //body size, topic size, properties size exactly equal to max size
+        messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
+        messageExtBrokerInner.setTopic(new String(new byte[127]));
+        messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE]));
+        PutMessageResult encodeResult1 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+        assertTrue(encodeResult1 == null);
+
+        //body size exactly more than max message body size
+        messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 1]);
+        PutMessageResult encodeResult2 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+        assertTrue(encodeResult2.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+        //body size exactly equal to max message size
+        messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 64 * 1024]);
+        PutMessageResult encodeResult3 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+        assertTrue(encodeResult3.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+        //message properties length more than properties maxSize
+        messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
+        messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE+1]));
+        PutMessageResult encodeResult4 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+        assertTrue(encodeResult4.getPutMessageStatus() == PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+
+        //message length more than buffer length capacity
+        messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
+        messageExtBrokerInner.setTopic(new String(new byte[Short.MAX_VALUE]));
+        messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE]));
+        PutMessageResult encodeResult5 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
+        assertTrue(encodeResult5.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+    }
+
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,