You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2022/06/30 08:07:00 UTC

[rocketmq] branch develop updated: [ISSUE#4520] [Optimization] Implenment adjusting maxMessageSize dynamicly (#4521)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 493e52b94 [ISSUE#4520] [Optimization] Implenment adjusting maxMessageSize dynamicly (#4521)
493e52b94 is described below

commit 493e52b945c7fdbdddb759847816d4952c966cff
Author: Shengmin Wang <10...@users.noreply.github.com>
AuthorDate: Thu Jun 30 16:06:43 2022 +0800

    [ISSUE#4520] [Optimization] Implenment adjusting maxMessageSize dynamicly (#4521)
    
    * add updateEncoderBufferCapacity funcation
    
    * add junit test, testDynamicMaxMessageSize()
    
    * add updateEncoderBufferCapacity in MultiDispatch class
    
    * add notion
    
    * add updateMaxMessageBodySize in DLedger mode
    
    * modify code style, add some whitespaces
    
    * remove the adaptation of DLeger mode
    
    * modify notion
    
    * DLedger mode recovery
    
    * check newMaxMessageSize need to >= 10
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 32 +++++++++++++++++++---
 .../org/apache/rocketmq/store/MultiDispatch.java   | 12 +++++++-
 .../rocketmq/store/DefaultMessageStoreTest.java    | 22 +++++++++++++++
 .../store/dledger/DLedgerCommitlogTest.java        |  1 -
 4 files changed, 61 insertions(+), 6 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 8b8a86315..efed87f67 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -604,6 +604,15 @@ public class CommitLog {
         return keyBuilder.toString();
     }
 
+    public void updateMaxMessageSize(PutMessageThreadLocal putMessageThreadLocal) {
+        // dynamically adjust maxMessageSize, but not support DLedger mode temporarily
+        int newMaxMessageSize = this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize();
+        if (newMaxMessageSize >= 10 &&
+                putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
+            putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
+        }
+    }
+
     public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
         // Set the storage time
         msg.setStoreTimestamp(System.currentTimeMillis());
@@ -650,6 +659,7 @@ public class CommitLog {
         }
 
         PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
+        updateMaxMessageSize(putMessageThreadLocal);
         if (!multiDispatch.isMultiDispatchMsg(msg)) {
             PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
             if (encodeResult != null) {
@@ -768,6 +778,7 @@ public class CommitLog {
 
         //fine-grained lock instead of the coarse-grained
         PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
+        updateMaxMessageSize(pmThreadLocal);
         MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
 
         PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
@@ -1479,15 +1490,16 @@ public class CommitLog {
     }
 
     public static class MessageExtEncoder {
-        private final ByteBuf byteBuf;
+        private ByteBuf byteBuf;
         // The maximum length of the message body.
-        private final int maxMessageBodySize;
+        private int maxMessageBodySize;
         // The maximum length of the full message.
-        private final int maxMessageSize;
+        private int maxMessageSize;
         MessageExtEncoder(final int maxMessageBodySize) {
             ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
             //Reserve 64kb for encoding buffer outside body
-            int maxMessageSize = maxMessageBodySize + 64 * 1024;
+            int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
+                    maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
             byteBuf = alloc.directBuffer(maxMessageSize);
             this.maxMessageBodySize = maxMessageBodySize;
             this.maxMessageSize = maxMessageSize;
@@ -1692,6 +1704,18 @@ public class CommitLog {
         public ByteBuffer getEncoderBuffer() {
             return this.byteBuf.nioBuffer();
         }
+
+        public int getMaxMessageBodySize() {
+            return this.maxMessageBodySize;
+        }
+
+        public void updateEncoderBufferCapacity(int newMaxMessageBodySize) {
+            this.maxMessageBodySize = newMaxMessageBodySize;
+            //Reserve 64kb for encoding buffer outside body
+            this.maxMessageSize = Integer.MAX_VALUE - newMaxMessageBodySize >= 64 * 1024 ?
+                    this.maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
+            this.byteBuf.capacity(this.maxMessageSize);
+        }
     }
 
     static class PutMessageThreadLocal {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
index e74b6ea9e..3ae3ac612 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -116,8 +116,18 @@ public class MultiDispatch {
         }
     }
 
+    public void updateMaxMessageSize(CommitLog.PutMessageThreadLocal putMessageThreadLocal) {
+        int newMaxMessageSize = this.messageStore.getMessageStoreConfig().getMaxMessageSize();
+        if (newMaxMessageSize >= 10 &&
+                putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
+            putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
+        }
+    }
+
     private boolean rebuildMsgInner(MessageExtBrokerInner msgInner) {
-        MessageExtEncoder encoder = this.commitLog.getPutMessageThreadLocal().get().getEncoder();
+        CommitLog.PutMessageThreadLocal putMessageThreadLocal = this.commitLog.getPutMessageThreadLocal().get();
+        updateMaxMessageSize(putMessageThreadLocal);
+        MessageExtEncoder encoder = putMessageThreadLocal.getEncoder();
         PutMessageResult encodeResult = encoder.encode(msgInner);
         if (encodeResult != null) {
             LOGGER.error("rebuild msgInner for multiDispatch", encodeResult);
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 0cabd18b0..491437d80 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -696,6 +696,28 @@ public class DefaultMessageStoreTest {
         assertTrue(encodeResult5.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
     }
 
+    @Test
+    public void testDynamicMaxMessageSize(){
+        MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+        MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
+        int originMaxMessageSize = messageStoreConfig.getMaxMessageSize();
+
+        messageExtBrokerInner.setBody(new byte[originMaxMessageSize + 10]);
+        PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+        int newMaxMessageSize = originMaxMessageSize + 10;
+        messageStoreConfig.setMaxMessageSize(newMaxMessageSize);
+        putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK);
+
+        messageStoreConfig.setMaxMessageSize(10);
+        putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+        messageStoreConfig.setMaxMessageSize(originMaxMessageSize);
+    }
+
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 88637db2d..a1f8c803e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -382,5 +382,4 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         followerStore.shutdown();
     }
 
-
 }