You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/12/07 03:42:50 UTC

[rocketmq] branch develop updated: [Issue #3476] Fix last separator of properties string is missing when using batch send. (#3479)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 83accb6  [Issue #3476] Fix last separator of properties string is missing when using batch send. (#3479)
83accb6 is described below

commit 83accb6f12d314be9d164a940009584091652570
Author: huangli <ar...@gmail.com>
AuthorDate: Tue Dec 7 11:42:43 2021 +0800

    [Issue #3476] Fix last separator of properties string is missing when using batch send. (#3479)
    
    This problem introduced since 4.9.1, may cause tag incorrect.
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 12 +++++++++---
 .../java/org/apache/rocketmq/store/BatchPutMessageTest.java  |  2 +-
 2 files changed, 10 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 36db2f5..112e9bc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1607,13 +1607,16 @@ public class CommitLog {
                 short propertiesLen = messagesByteBuff.getShort();
                 int propertiesPos = messagesByteBuff.position();
                 messagesByteBuff.position(propertiesPos + propertiesLen);
+                boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
+                            && messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;
 
                 final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
 
                 final int topicLength = topicData.length;
 
-                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength,
-                        propertiesLen + batchPropLen);
+                int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen + 1
+                                                                     : propertiesLen + batchPropLen;
+                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);
 
                 // Exceeds the maximum message
                 if (msgLen > this.maxMessageSize) {
@@ -1666,11 +1669,14 @@ public class CommitLog {
                 this.encoderBuffer.put((byte) topicLength);
                 this.encoderBuffer.put(topicData);
                 // 17 PROPERTIES
-                this.encoderBuffer.putShort((short) (propertiesLen + batchPropLen));
+                this.encoderBuffer.putShort((short) totalPropLen);
                 if (propertiesLen > 0) {
                     this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
                 }
                 if (batchPropLen > 0) {
+                    if (needAppendLastPropertySeparator) {
+                        this.encoderBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
+                    }
                     this.encoderBuffer.put(batchPropData, 0, batchPropLen);
                 }
             }
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 2c1fd25..3bc52e3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -105,7 +105,7 @@ public class BatchPutMessageTest {
             short propertiesLength = (short) propertiesBytes.length;
             final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
             final int topicLength = topicData.length;
-            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen) + msgLengthArr[j - 1];
+            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen+1) + msgLengthArr[j - 1];
             j++;
         }
         byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);