You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/28 06:33:21 UTC

[pulsar] branch master updated: [fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 (#16196)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 742158932c0 [fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 (#16196)
742158932c0 is described below

commit 742158932c0bf1f6fd8814253a5d944d529202a1
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 28 14:33:13 2022 +0800

    [fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 (#16196)
    
    Fixes https://github.com/apache/pulsar/issues/16195
    
    ### Motivation
    
    [PIP-132](https://github.com/apache/pulsar/pull/14007) considers the
    message metadata size when computing the payload chunk size and the
    number of chunks. However, it could make some messages whose size is
    less than `maxMessageSize` cannot be sent. There are two reasons:
    1. The `MessageMetadata` will be updated after computing the payload
       chunk size, i.e. the actual metadata size would be greater.
    2. `OpSendMsg#getMessageHeaderAndPayloadSize` doesn't exclude all bytes
       other than the metadata and payload, e.g. the 4 bytes checksum field.
    
    For example, if the max message size is 100, send a string whose size is
    60 with chunking enabled.
    1. The initial metadata size is 25 so the chunk size is 75, the message
       won't be spit into chunks.
    2. After `serializeAndSendMessage`, the metadata size becomes 32, so the
       serialized header's total size is 4 + 8 + 6 + 4 + 32 = 54, and the
      total size is 54 + 60 = 114, see `headerContentSize` in
      `serializeCommandSendWithSize`.
    3. In `getMessageHeaderAndPayloadSize`, the returned value is computed
       by 114 - 8 - 4 = 102 > 100. The 6 bytes magic and checksum and 4
       bytes metadata length field are not included.
    
    ### Modifications
    
    - Update the message metadata before computing the chunk size.
    - Compute the correct size in `getMessageHeaderAndPayloadSize`.
    
    ### Verifying this change
    
    Add `testChunkSize` to verify all sizes in range [1, maxMessageSize] can
    be sent successfully when chunking is enabled.
---
 .../pulsar/client/impl/MessageChunkingTest.java    | 23 +++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 71 ++++++++++++++--------
 2 files changed, 68 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 00e6c2f78e3..783e971f391 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -550,6 +551,28 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         assertEquals(consumer.conf.getExpireTimeOfIncompleteChunkedMessageMillis(), 12);
     }
 
+    @Test
+    public void testChunkSize() throws Exception {
+        final int maxMessageSize = 50;
+        final int payloadChunkSize = maxMessageSize - 32/* the default message metadata size for string schema */;
+        this.conf.setMaxMessageSize(maxMessageSize);
+
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .create();
+        for (int size = 1; size <= maxMessageSize; size++) {
+            final MessageId messageId = producer.send(createMessagePayload(size));
+            log.info("Send {} bytes to {}", size, messageId);
+            if (size <= payloadChunkSize) {
+                assertEquals(messageId.getClass(), MessageIdImpl.class);
+            } else {
+                assertEquals(messageId.getClass(), ChunkMessageIdImpl.class);
+            }
+        }
+    }
+
     private String createMessagePayload(int size) {
         StringBuilder str = new StringBuilder();
         Random rand = new Random();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 163a672d1c1..bb114e7f0e3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -441,7 +441,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         MessageImpl<?> msg = (MessageImpl<?>) message;
         MessageMetadata msgMetadata = msg.getMessageBuilder();
         ByteBuf payload = msg.getDataBuffer();
-        int uncompressedSize = payload.readableBytes();
+        final int uncompressedSize = payload.readableBytes();
 
         if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
             return;
@@ -488,6 +488,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return;
         }
 
+        // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
+        // into chunks.
+        final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize);
+
         // send in chunks
         int totalChunks;
         int payloadChunkSize;
@@ -526,13 +530,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         try {
             synchronized (this) {
                 int readStartIndex = 0;
-                long sequenceId;
-                if (!msgMetadata.hasSequenceId()) {
-                    sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
-                    msgMetadata.setSequenceId(sequenceId);
-                } else {
-                    sequenceId = msgMetadata.getSequenceId();
-                }
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
                 ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
                 byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
@@ -554,7 +551,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     }
                     serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
                             readStartIndex, payloadChunkSize, compressedPayload, compressed,
-                            compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
+                            compressedPayload.readableBytes(), callback, chunkedMessageCtx);
                     readStartIndex = ((chunkId + 1) * payloadChunkSize);
                 }
             }
@@ -567,6 +564,38 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
+    /**
+     * Update the message metadata except those fields that will be updated for chunks later.
+     *
+     * @param msgMetadata
+     * @param uncompressedSize
+     * @return the sequence id
+     */
+    private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
+        final long sequenceId;
+        if (!msgMetadata.hasSequenceId()) {
+            sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
+            msgMetadata.setSequenceId(sequenceId);
+        } else {
+            sequenceId = msgMetadata.getSequenceId();
+        }
+
+        if (!msgMetadata.hasPublishTime()) {
+            msgMetadata.setPublishTime(client.getClientClock().millis());
+
+            checkArgument(!msgMetadata.hasProducerName());
+
+            msgMetadata.setProducerName(producerName);
+
+            if (conf.getCompressionType() != CompressionType.NONE) {
+                msgMetadata
+                        .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+            }
+            msgMetadata.setUncompressedSize(uncompressedSize);
+        }
+        return sequenceId;
+    }
+
     @Override
     public int getNumOfPartitions() {
         return 0;
@@ -583,7 +612,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                                          ByteBuf compressedPayload,
                                          boolean compressed,
                                          int compressedPayloadSize,
-                                         int uncompressedSize,
                                          SendCallback callback,
                                          ChunkedMessageCtx chunkedMessageCtx) throws IOException {
         ByteBuf chunkPayload = compressedPayload;
@@ -603,19 +631,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 .setNumChunksFromMsg(totalChunks)
                 .setTotalChunkMsgSize(compressedPayloadSize);
         }
-        if (!msgMetadata.hasPublishTime()) {
-            msgMetadata.setPublishTime(client.getClientClock().millis());
-
-            checkArgument(!msgMetadata.hasProducerName());
-
-            msgMetadata.setProducerName(producerName);
-
-            if (conf.getCompressionType() != CompressionType.NONE) {
-                msgMetadata
-                        .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
-            }
-            msgMetadata.setUncompressedSize(uncompressedSize);
-        }
 
         if (canAddToBatch(msg) && totalChunks <= 1) {
             if (canAddToCurrentBatch(msg)) {
@@ -1483,7 +1498,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             cmdHeader.markReaderIndex();
             int totalSize = cmdHeader.readInt();
             int cmdSize = cmdHeader.readInt();
-            int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
+            // The totalSize includes:
+            // | cmdLength | cmdSize | magic and checksum | msgMetadataLength | msgMetadata |
+            // | --------- | ------- | ------------------ | ----------------- | ----------- |
+            // | 4         |         | 6                  | 4                 |             |
+            int msgHeadersAndPayloadSize = totalSize - 4 - cmdSize - 6 - 4;
             cmdHeader.resetReaderIndex();
             return msgHeadersAndPayloadSize;
         }
@@ -2214,8 +2233,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     /**
      *  Check if final message size for non-batch and non-chunked messages is larger than max message size.
      */
-    public boolean isMessageSizeExceeded(OpSendMsg op) {
-        if (op.msg != null && op.totalChunks <= 1) {
+    private boolean isMessageSizeExceeded(OpSendMsg op) {
+        if (op.msg != null && !conf.isChunkingEnabled()) {
             int messageSize = op.getMessageHeaderAndPayloadSize();
             if (messageSize > ClientCnx.getMaxMessageSize()) {
                 releaseSemaphoreForSendOp(op);