You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/16 05:42:56 UTC

[rocketmq] branch develop updated: [ISSUE apache#4058] DLedgerCommitLog support LMQ (#4059)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6273782d6 [ISSUE apache#4058] DLedgerCommitLog support LMQ (#4059)
6273782d6 is described below

commit 6273782d663483e5c278f9ef2de15ca137fe7949
Author: chaiyx <ch...@163.com>
AuthorDate: Thu Jun 16 13:42:48 2022 +0800

    [ISSUE apache#4058] DLedgerCommitLog support LMQ (#4059)
    
    Co-authored-by: cserwen <cs...@163.com>
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 12 ++++++-----
 .../org/apache/rocketmq/store/MultiDispatch.java   | 19 +++++++++++++++++-
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 23 ++++++++++++++++++----
 3 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index a3acefdee..ed0267b1b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -78,7 +78,7 @@ public class CommitLog {
 
     private volatile Set<String> fullStorePaths = Collections.emptySet();
 
-    private final MultiDispatch multiDispatch;
+    protected final MultiDispatch multiDispatch;
     private final FlushDiskWatcher flushDiskWatcher;
 
     public CommitLog(final DefaultMessageStore defaultMessageStore) {
@@ -650,11 +650,13 @@ public class CommitLog {
         }
 
         PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
-        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
-        if (encodeResult != null) {
-            return CompletableFuture.completedFuture(encodeResult);
+        if (!multiDispatch.isMultiDispatchMsg(msg)) {
+            PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
+            if (encodeResult != null) {
+                return CompletableFuture.completedFuture(encodeResult);
+            }
+            msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
         }
-        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
         PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
 
         long elapsedTimeInLock = 0;
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
index 679eed123..e74b6ea9e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.CommitLog.MessageExtEncoder;
+import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
 
 /**
  * not-thread-safe
@@ -34,10 +35,22 @@ public class MultiDispatch {
     private final StringBuilder keyBuilder = new StringBuilder();
     private final DefaultMessageStore messageStore;
     private final CommitLog commitLog;
+    private boolean isDLedger;
 
     public MultiDispatch(DefaultMessageStore messageStore, CommitLog commitLog) {
         this.messageStore = messageStore;
         this.commitLog = commitLog;
+        isDLedger = commitLog instanceof DLedgerCommitLog;
+    }
+
+    public boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
+        if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+            return false;
+        }
+        if (StringUtils.isBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+            return false;
+        }
+        return true;
     }
 
     public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
@@ -83,7 +96,11 @@ public class MultiDispatch {
         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
             StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
         removeWaitStorePropertyString(msgInner);
-        return rebuildMsgInner(msgInner);
+        if (isDLedger) {
+            return true;
+        } else {
+            return rebuildMsgInner(msgInner);
+        }
     }
 
     private void removeWaitStorePropertyString(MessageExtBrokerInner msgInner) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index a88bcd498..5474fa46b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -435,17 +435,31 @@ public class DLedgerCommitLog extends CommitLog {
         // Back to Results
         AppendMessageResult appendResult;
         AppendFuture<AppendEntryResponse> dledgerFuture;
-        EncodeResult encodeResult;
+        EncodeResult encodeResult = null;
 
-        encodeResult = this.messageSerializer.serialize(msg);
-        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
+        boolean isMultiDispatch = multiDispatch.isMultiDispatchMsg(msg);
+        if (!isMultiDispatch) {
+            encodeResult = this.messageSerializer.serialize(msg);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
+            }
         }
         putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
         long elapsedTimeInLock;
         long queueOffset;
         try {
             beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            if (isMultiDispatch) {
+                boolean multiDispatchWrapResult = multiDispatch.wrapMultiDispatch(msg);
+                if (!multiDispatchWrapResult) {
+                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+                } else {
+                    encodeResult = this.messageSerializer.serialize(msg);
+                    if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
+                    }
+                }
+            }
             queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
             encodeResult.setQueueOffsetKey(queueOffset, false);
             AppendEntryRequest request = new AppendEntryRequest();
@@ -472,6 +486,7 @@ public class DLedgerCommitLog extends CommitLog {
                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                     // The next update ConsumeQueue information
                     DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
+                    multiDispatch.updateMultiQueueOffset(msg);
                     break;
                 default:
                     break;