You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/28 06:33:21 UTC
[pulsar] branch master updated: [fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 (#16196)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 742158932c0 [fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 (#16196)
742158932c0 is described below
commit 742158932c0bf1f6fd8814253a5d944d529202a1
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 28 14:33:13 2022 +0800
[fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 (#16196)
Fixes https://github.com/apache/pulsar/issues/16195
### Motivation
[PIP-132](https://github.com/apache/pulsar/pull/14007) considers the
message metadata size when computing the payload chunk size and the
number of chunks. However, it could make some messages whose size is
less than `maxMessageSize` cannot be sent. There are two reasons:
1. The `MessageMetadata` will be updated after computing the payload
chunk size, i.e. the actual metadata size would be greater.
2. `OpSendMsg#getMessageHeaderAndPayloadSize` doesn't exclude all bytes
other than the metadata and payload, e.g. the 4 bytes checksum field.
For example, if the max message size is 100, send a string whose size is
60 with chunking enabled.
1. The initial metadata size is 25 so the chunk size is 75, the message
won't be spit into chunks.
2. After `serializeAndSendMessage`, the metadata size becomes 32, so the
serialized header's total size is 4 + 8 + 6 + 4 + 32 = 54, and the
total size is 54 + 60 = 114, see `headerContentSize` in
`serializeCommandSendWithSize`.
3. In `getMessageHeaderAndPayloadSize`, the returned value is computed
by 114 - 8 - 4 = 102 > 100. The 6 bytes magic and checksum and 4
bytes metadata length field are not included.
### Modifications
- Update the message metadata before computing the chunk size.
- Compute the correct size in `getMessageHeaderAndPayloadSize`.
### Verifying this change
Add `testChunkSize` to verify all sizes in range [1, maxMessageSize] can
be sent successfully when chunking is enabled.
---
.../pulsar/client/impl/MessageChunkingTest.java | 23 +++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 71 ++++++++++++++--------
2 files changed, 68 insertions(+), 26 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 00e6c2f78e3..783e971f391 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -550,6 +551,28 @@ public class MessageChunkingTest extends ProducerConsumerBase {
assertEquals(consumer.conf.getExpireTimeOfIncompleteChunkedMessageMillis(), 12);
}
+ @Test
+ public void testChunkSize() throws Exception {
+ final int maxMessageSize = 50;
+ final int payloadChunkSize = maxMessageSize - 32/* the default message metadata size for string schema */;
+ this.conf.setMaxMessageSize(maxMessageSize);
+
+ final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("my-property/my-ns/test-chunk-size")
+ .enableChunking(true)
+ .enableBatching(false)
+ .create();
+ for (int size = 1; size <= maxMessageSize; size++) {
+ final MessageId messageId = producer.send(createMessagePayload(size));
+ log.info("Send {} bytes to {}", size, messageId);
+ if (size <= payloadChunkSize) {
+ assertEquals(messageId.getClass(), MessageIdImpl.class);
+ } else {
+ assertEquals(messageId.getClass(), ChunkMessageIdImpl.class);
+ }
+ }
+ }
+
private String createMessagePayload(int size) {
StringBuilder str = new StringBuilder();
Random rand = new Random();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 163a672d1c1..bb114e7f0e3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -441,7 +441,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
MessageImpl<?> msg = (MessageImpl<?>) message;
MessageMetadata msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
- int uncompressedSize = payload.readableBytes();
+ final int uncompressedSize = payload.readableBytes();
if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
return;
@@ -488,6 +488,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return;
}
+ // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
+ // into chunks.
+ final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize);
+
// send in chunks
int totalChunks;
int payloadChunkSize;
@@ -526,13 +530,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
try {
synchronized (this) {
int readStartIndex = 0;
- long sequenceId;
- if (!msgMetadata.hasSequenceId()) {
- sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
- msgMetadata.setSequenceId(sequenceId);
- } else {
- sequenceId = msgMetadata.getSequenceId();
- }
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
@@ -554,7 +551,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, payloadChunkSize, compressedPayload, compressed,
- compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
+ compressedPayload.readableBytes(), callback, chunkedMessageCtx);
readStartIndex = ((chunkId + 1) * payloadChunkSize);
}
}
@@ -567,6 +564,38 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
+ /**
+ * Update the message metadata except those fields that will be updated for chunks later.
+ *
+ * @param msgMetadata
+ * @param uncompressedSize
+ * @return the sequence id
+ */
+ private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
+ final long sequenceId;
+ if (!msgMetadata.hasSequenceId()) {
+ sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
+ msgMetadata.setSequenceId(sequenceId);
+ } else {
+ sequenceId = msgMetadata.getSequenceId();
+ }
+
+ if (!msgMetadata.hasPublishTime()) {
+ msgMetadata.setPublishTime(client.getClientClock().millis());
+
+ checkArgument(!msgMetadata.hasProducerName());
+
+ msgMetadata.setProducerName(producerName);
+
+ if (conf.getCompressionType() != CompressionType.NONE) {
+ msgMetadata
+ .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+ }
+ msgMetadata.setUncompressedSize(uncompressedSize);
+ }
+ return sequenceId;
+ }
+
@Override
public int getNumOfPartitions() {
return 0;
@@ -583,7 +612,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
ByteBuf compressedPayload,
boolean compressed,
int compressedPayloadSize,
- int uncompressedSize,
SendCallback callback,
ChunkedMessageCtx chunkedMessageCtx) throws IOException {
ByteBuf chunkPayload = compressedPayload;
@@ -603,19 +631,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
.setNumChunksFromMsg(totalChunks)
.setTotalChunkMsgSize(compressedPayloadSize);
}
- if (!msgMetadata.hasPublishTime()) {
- msgMetadata.setPublishTime(client.getClientClock().millis());
-
- checkArgument(!msgMetadata.hasProducerName());
-
- msgMetadata.setProducerName(producerName);
-
- if (conf.getCompressionType() != CompressionType.NONE) {
- msgMetadata
- .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
- }
- msgMetadata.setUncompressedSize(uncompressedSize);
- }
if (canAddToBatch(msg) && totalChunks <= 1) {
if (canAddToCurrentBatch(msg)) {
@@ -1483,7 +1498,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
cmdHeader.markReaderIndex();
int totalSize = cmdHeader.readInt();
int cmdSize = cmdHeader.readInt();
- int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
+ // The totalSize includes:
+ // | cmdLength | cmdSize | magic and checksum | msgMetadataLength | msgMetadata |
+ // | --------- | ------- | ------------------ | ----------------- | ----------- |
+ // | 4 | | 6 | 4 | |
+ int msgHeadersAndPayloadSize = totalSize - 4 - cmdSize - 6 - 4;
cmdHeader.resetReaderIndex();
return msgHeadersAndPayloadSize;
}
@@ -2214,8 +2233,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
/**
* Check if final message size for non-batch and non-chunked messages is larger than max message size.
*/
- public boolean isMessageSizeExceeded(OpSendMsg op) {
- if (op.msg != null && op.totalChunks <= 1) {
+ private boolean isMessageSizeExceeded(OpSendMsg op) {
+ if (op.msg != null && !conf.isChunkingEnabled()) {
int messageSize = op.getMessageHeaderAndPayloadSize();
if (messageSize > ClientCnx.getMaxMessageSize()) {
releaseSemaphoreForSendOp(op);