You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/05/20 02:34:27 UTC

[rocketmq] 01/01: Revert "[ISSUE #2865] Batch message send bug fix (#2866)"

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch revert-2866-batch_msg_bug_fix
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit d7f865ec22c6bd52cb8499824929a01f1a639029
Author: von gosling <vo...@apache.org>
AuthorDate: Thu May 20 10:34:14 2021 +0800

    Revert "[ISSUE #2865] Batch message send bug fix (#2866)"
    
    This reverts commit 16694485a7e111c601c0ec8c2a76a44cdd830067.
---
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 58 +++++++++++-----------
 1 file changed, 29 insertions(+), 29 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 9241ffe..9a6e7a7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -426,18 +426,17 @@ public class DLedgerCommitLog extends CommitLog {
         AppendFuture<AppendEntryResponse> dledgerFuture;
         EncodeResult encodeResult;
 
-        encodeResult = this.messageSerializer.serialize(msg);
-        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
-        }
-
         putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
         long elapsedTimeInLock;
         long queueOffset;
         try {
             beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(msg);
             queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, false);
+            encodeResult.setQueueOffsetKey(queueOffset);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
+            }
             AppendEntryRequest request = new AppendEntryRequest();
             request.setGroup(dLedgerConfig.getGroup());
             request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -543,12 +542,6 @@ public class DLedgerCommitLog extends CommitLog {
         BatchAppendFuture<AppendEntryResponse> dledgerFuture;
         EncodeResult encodeResult;
 
-        encodeResult = this.messageSerializer.serialize(messageExtBatch);
-        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
-                    .status));
-        }
-
         putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
         msgIdBuilder.setLength(0);
         long elapsedTimeInLock;
@@ -556,8 +549,12 @@ public class DLedgerCommitLog extends CommitLog {
         long msgNum = 0;
         try {
             beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
-            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, true);
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
             BatchAppendEntryRequest request = new BatchAppendEntryRequest();
             request.setGroup(dLedgerConfig.getGroup());
             request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -667,7 +664,7 @@ public class DLedgerCommitLog extends CommitLog {
         try {
             beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
             queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, false);
+            encodeResult.setQueueOffsetKey(queueOffset);
             AppendEntryRequest request = new AppendEntryRequest();
             request.setGroup(dLedgerConfig.getGroup());
             request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -782,8 +779,7 @@ public class DLedgerCommitLog extends CommitLog {
         long msgNum = 0;
         try {
             beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
-            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, true);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
             BatchAppendEntryRequest request = new BatchAppendEntryRequest();
             request.setGroup(dLedgerConfig.getGroup());
             request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -961,15 +957,8 @@ public class DLedgerCommitLog extends CommitLog {
             this.queueOffsetKey = queueOffsetKey;
         }
 
-        public void setQueueOffsetKey(long offset, boolean isBatch) {
-            if (!isBatch) {
-                this.data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
-                return;
-            }
-
-            for (byte[] data : batchData) {
-                ByteBuffer.wrap(data).putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset++);
-            }
+        public void setQueueOffsetKey(long offset) {
+            data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
         }
 
         public byte[] getData() {
@@ -988,6 +977,8 @@ public class DLedgerCommitLog extends CommitLog {
 
         // The maximum length of the message
         private final int maxMessageSize;
+        // Build Message Key
+        private final StringBuilder keyBuilder = new StringBuilder();
 
         MessageSerializer(final int size) {
             this.maxMessageSize = size;
@@ -1088,7 +1079,17 @@ public class DLedgerCommitLog extends CommitLog {
         }
 
         public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
-            String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId();
+            keyBuilder.setLength(0);
+            keyBuilder.append(messageExtBatch.getTopic());
+            keyBuilder.append('-');
+            keyBuilder.append(messageExtBatch.getQueueId());
+            String key = keyBuilder.toString();
+
+            Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
+            if (null == queueOffset) {
+                queueOffset = 0L;
+                DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
+            }
 
             int totalMsgLen = 0;
             ByteBuffer messagesByteBuff = messageExtBatch.wrap();
@@ -1153,7 +1154,7 @@ public class DLedgerCommitLog extends CommitLog {
                 // 5 FLAG
                 msgStoreItemMemory.putInt(flag);
                 // 6 QUEUEOFFSET
-                msgStoreItemMemory.putLong(0L);
+                msgStoreItemMemory.putLong(queueOffset++);
                 // 7 PHYSICALOFFSET
                 msgStoreItemMemory.putLong(0);
                 // 8 SYSFLAG
@@ -1209,7 +1210,6 @@ public class DLedgerCommitLog extends CommitLog {
             this.sbr = sbr;
         }
 
-        @Override
         public synchronized void release() {
             super.release();
             if (sbr != null) {