You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/16 05:42:56 UTC
[rocketmq] branch develop updated: [ISSUE apache#4058] DLedgerCommitLog support LMQ (#4059)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6273782d6 [ISSUE apache#4058] DLedgerCommitLog support LMQ (#4059)
6273782d6 is described below
commit 6273782d663483e5c278f9ef2de15ca137fe7949
Author: chaiyx <ch...@163.com>
AuthorDate: Thu Jun 16 13:42:48 2022 +0800
[ISSUE apache#4058] DLedgerCommitLog support LMQ (#4059)
Co-authored-by: cserwen <cs...@163.com>
---
.../java/org/apache/rocketmq/store/CommitLog.java | 12 ++++++-----
.../org/apache/rocketmq/store/MultiDispatch.java | 19 +++++++++++++++++-
.../rocketmq/store/dledger/DLedgerCommitLog.java | 23 ++++++++++++++++++----
3 files changed, 44 insertions(+), 10 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index a3acefdee..ed0267b1b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -78,7 +78,7 @@ public class CommitLog {
private volatile Set<String> fullStorePaths = Collections.emptySet();
- private final MultiDispatch multiDispatch;
+ protected final MultiDispatch multiDispatch;
private final FlushDiskWatcher flushDiskWatcher;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
@@ -650,11 +650,13 @@ public class CommitLog {
}
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
- PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
- if (encodeResult != null) {
- return CompletableFuture.completedFuture(encodeResult);
+ if (!multiDispatch.isMultiDispatchMsg(msg)) {
+ PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
+ if (encodeResult != null) {
+ return CompletableFuture.completedFuture(encodeResult);
+ }
+ msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
}
- msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
long elapsedTimeInLock = 0;
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
index 679eed123..e74b6ea9e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.CommitLog.MessageExtEncoder;
+import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
/**
* not-thread-safe
@@ -34,10 +35,22 @@ public class MultiDispatch {
private final StringBuilder keyBuilder = new StringBuilder();
private final DefaultMessageStore messageStore;
private final CommitLog commitLog;
+ private boolean isDLedger;
public MultiDispatch(DefaultMessageStore messageStore, CommitLog commitLog) {
this.messageStore = messageStore;
this.commitLog = commitLog;
+ isDLedger = commitLog instanceof DLedgerCommitLog;
+ }
+
+ public boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
+ if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+ return false;
+ }
+ if (StringUtils.isBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+ return false;
+ }
+ return true;
}
public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
@@ -83,7 +96,11 @@ public class MultiDispatch {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
removeWaitStorePropertyString(msgInner);
- return rebuildMsgInner(msgInner);
+ if (isDLedger) {
+ return true;
+ } else {
+ return rebuildMsgInner(msgInner);
+ }
}
private void removeWaitStorePropertyString(MessageExtBrokerInner msgInner) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index a88bcd498..5474fa46b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -435,17 +435,31 @@ public class DLedgerCommitLog extends CommitLog {
// Back to Results
AppendMessageResult appendResult;
AppendFuture<AppendEntryResponse> dledgerFuture;
- EncodeResult encodeResult;
+ EncodeResult encodeResult = null;
- encodeResult = this.messageSerializer.serialize(msg);
- if (encodeResult.status != AppendMessageStatus.PUT_OK) {
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
+ boolean isMultiDispatch = multiDispatch.isMultiDispatchMsg(msg);
+ if (!isMultiDispatch) {
+ encodeResult = this.messageSerializer.serialize(msg);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
+ }
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+ if (isMultiDispatch) {
+ boolean multiDispatchWrapResult = multiDispatch.wrapMultiDispatch(msg);
+ if (!multiDispatchWrapResult) {
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ } else {
+ encodeResult = this.messageSerializer.serialize(msg);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
+ }
+ }
+ }
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest();
@@ -472,6 +486,7 @@ public class DLedgerCommitLog extends CommitLog {
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
+ multiDispatch.updateMultiQueueOffset(msg);
break;
default:
break;