You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2022/06/30 08:07:00 UTC
[rocketmq] branch develop updated: [ISSUE#4520] [Optimization] Implenment adjusting maxMessageSize dynamicly (#4521)
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 493e52b94 [ISSUE#4520] [Optimization] Implenment adjusting maxMessageSize dynamicly (#4521)
493e52b94 is described below
commit 493e52b945c7fdbdddb759847816d4952c966cff
Author: Shengmin Wang <10...@users.noreply.github.com>
AuthorDate: Thu Jun 30 16:06:43 2022 +0800
[ISSUE#4520] [Optimization] Implenment adjusting maxMessageSize dynamicly (#4521)
* add updateEncoderBufferCapacity funcation
* add junit test, testDynamicMaxMessageSize()
* add updateEncoderBufferCapacity in MultiDispatch class
* add notion
* add updateMaxMessageBodySize in DLedger mode
* modify code style, add some whitespaces
* remove the adaptation of DLeger mode
* modify notion
* DLedger mode recovery
* check newMaxMessageSize need to >= 10
---
.../java/org/apache/rocketmq/store/CommitLog.java | 32 +++++++++++++++++++---
.../org/apache/rocketmq/store/MultiDispatch.java | 12 +++++++-
.../rocketmq/store/DefaultMessageStoreTest.java | 22 +++++++++++++++
.../store/dledger/DLedgerCommitlogTest.java | 1 -
4 files changed, 61 insertions(+), 6 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 8b8a86315..efed87f67 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -604,6 +604,15 @@ public class CommitLog {
return keyBuilder.toString();
}
+ public void updateMaxMessageSize(PutMessageThreadLocal putMessageThreadLocal) {
+ // dynamically adjust maxMessageSize, but not support DLedger mode temporarily
+ int newMaxMessageSize = this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize();
+ if (newMaxMessageSize >= 10 &&
+ putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
+ putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
+ }
+ }
+
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
@@ -650,6 +659,7 @@ public class CommitLog {
}
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
+ updateMaxMessageSize(putMessageThreadLocal);
if (!multiDispatch.isMultiDispatchMsg(msg)) {
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
@@ -768,6 +778,7 @@ public class CommitLog {
//fine-grained lock instead of the coarse-grained
PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
+ updateMaxMessageSize(pmThreadLocal);
MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
@@ -1479,15 +1490,16 @@ public class CommitLog {
}
public static class MessageExtEncoder {
- private final ByteBuf byteBuf;
+ private ByteBuf byteBuf;
// The maximum length of the message body.
- private final int maxMessageBodySize;
+ private int maxMessageBodySize;
// The maximum length of the full message.
- private final int maxMessageSize;
+ private int maxMessageSize;
MessageExtEncoder(final int maxMessageBodySize) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
//Reserve 64kb for encoding buffer outside body
- int maxMessageSize = maxMessageBodySize + 64 * 1024;
+ int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
+ maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
byteBuf = alloc.directBuffer(maxMessageSize);
this.maxMessageBodySize = maxMessageBodySize;
this.maxMessageSize = maxMessageSize;
@@ -1692,6 +1704,18 @@ public class CommitLog {
public ByteBuffer getEncoderBuffer() {
return this.byteBuf.nioBuffer();
}
+
+ public int getMaxMessageBodySize() {
+ return this.maxMessageBodySize;
+ }
+
+ public void updateEncoderBufferCapacity(int newMaxMessageBodySize) {
+ this.maxMessageBodySize = newMaxMessageBodySize;
+ //Reserve 64kb for encoding buffer outside body
+ this.maxMessageSize = Integer.MAX_VALUE - newMaxMessageBodySize >= 64 * 1024 ?
+ this.maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
+ this.byteBuf.capacity(this.maxMessageSize);
+ }
}
static class PutMessageThreadLocal {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
index e74b6ea9e..3ae3ac612 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -116,8 +116,18 @@ public class MultiDispatch {
}
}
+ public void updateMaxMessageSize(CommitLog.PutMessageThreadLocal putMessageThreadLocal) {
+ int newMaxMessageSize = this.messageStore.getMessageStoreConfig().getMaxMessageSize();
+ if (newMaxMessageSize >= 10 &&
+ putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
+ putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
+ }
+ }
+
private boolean rebuildMsgInner(MessageExtBrokerInner msgInner) {
- MessageExtEncoder encoder = this.commitLog.getPutMessageThreadLocal().get().getEncoder();
+ CommitLog.PutMessageThreadLocal putMessageThreadLocal = this.commitLog.getPutMessageThreadLocal().get();
+ updateMaxMessageSize(putMessageThreadLocal);
+ MessageExtEncoder encoder = putMessageThreadLocal.getEncoder();
PutMessageResult encodeResult = encoder.encode(msgInner);
if (encodeResult != null) {
LOGGER.error("rebuild msgInner for multiDispatch", encodeResult);
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 0cabd18b0..491437d80 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -696,6 +696,28 @@ public class DefaultMessageStoreTest {
assertTrue(encodeResult5.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
}
+ @Test
+ public void testDynamicMaxMessageSize(){
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+ MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
+ int originMaxMessageSize = messageStoreConfig.getMaxMessageSize();
+
+ messageExtBrokerInner.setBody(new byte[originMaxMessageSize + 10]);
+ PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+ int newMaxMessageSize = originMaxMessageSize + 10;
+ messageStoreConfig.setMaxMessageSize(newMaxMessageSize);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK);
+
+ messageStoreConfig.setMaxMessageSize(10);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
+
+ messageStoreConfig.setMaxMessageSize(originMaxMessageSize);
+ }
+
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 88637db2d..a1f8c803e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -382,5 +382,4 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
followerStore.shutdown();
}
-
}