You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/14 17:04:16 UTC
[pulsar] branch master updated: In Java allow messages that
compress to <5mb to be sent with batching enabled (#3718)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4b3fed9 In Java allow messages that compress to <5mb to be sent with batching enabled (#3718)
4b3fed9 is described below
commit 4b3fed9bd43f0df18bacf947bc9c073841dec2c5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 14 10:04:08 2019 -0700
In Java allow messages that compress to <5mb to be sent with batching enabled (#3718)
* In Java allow messages that compress to <5mb to be sent with batching enabled
* Test is not anymore accurate since behavior changed and we're already testing in ProducerConsumerTest
---
.../client/api/SimpleProducerConsumerTest.java | 27 +++++---
.../client/api/v1/V1_ProducerConsumerTest.java | 75 ----------------------
.../pulsar/client/impl/BatchMessageContainer.java | 2 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 40 +++++++-----
4 files changed, 45 insertions(+), 99 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index d51c686..af9be10 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -651,9 +651,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
* <pre>
* send msg with size > MAX_SIZE (5 MB)
* a. non-batch with compression: pass
- * b. batch-msg with compression: fail
+ * b. batch-msg with compression: pass
* c. non-batch w/o compression: fail
* d. non-batch with compression, consumer consume: pass
+ * e. batch-msg w/o compression: fail
* </pre>
*
* @throws Exception
@@ -673,18 +674,13 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
producer.close();
- // (b) batch-msg
+ // (b) batch-msg with compression
producer = pulsarClient.newProducer().topic(topic)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.LZ4)
.create();
- try {
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
- fail("Should have thrown exception");
- } catch (PulsarClientException.InvalidMessageException e) {
- // OK
- }
+ producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
producer.close();
// (c) non-batch msg without compression
@@ -712,6 +708,21 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producer.send(content);
assertEquals(consumer.receive().getData(), content);
producer.close();
+
+ // (e) batch-msg w/o compression
+ producer = pulsarClient.newProducer().topic(topic)
+ .enableBatching(true)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .compressionType(CompressionType.NONE)
+ .create();
+ try {
+ producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+ fail("Should have thrown exception");
+ } catch (PulsarClientException.InvalidMessageException e) {
+ // OK
+ }
+ producer.close();
+
consumer.close();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index ab7163a..161c374 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -618,81 +618,6 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
/**
- * Verifies non-batch message size being validated after performing compression while batch-messaging validates
- * before compression of message
- *
- * <pre>
- * send msg with size > MAX_SIZE (5 MB)
- * a. non-batch with compression: pass
- * b. batch-msg with compression: fail
- * c. non-batch w/o compression: fail
- * d. non-batch with compression, consumer consume: pass
- * </pre>
- *
- * @throws Exception
- */
- @Test
- public void testSendBigMessageSizeButCompressed() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
- final String topic = "persistent://my-property/use/my-ns/bigMsg";
-
- // (a) non-batch msg with compression
- Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topic)
- .enableBatching(false)
- .compressionType(CompressionType.LZ4)
- .create();
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
- producer.close();
-
- // (b) batch-msg
- producer = pulsarClient.newProducer()
- .topic(topic)
- .enableBatching(true)
- .compressionType(CompressionType.LZ4)
- .create();
- try {
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
- fail("Should have thrown exception");
- } catch (PulsarClientException.InvalidMessageException e) {
- // OK
- }
- producer.close();
-
- // (c) non-batch msg without compression
- producer = pulsarClient.newProducer()
- .topic(topic)
- .enableBatching(false)
- .compressionType(CompressionType.NONE)
- .create();
- try {
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
- fail("Should have thrown exception");
- } catch (PulsarClientException.InvalidMessageException e) {
- // OK
- }
- producer.close();
-
- // (d) non-batch msg with compression and try to consume message
- producer = pulsarClient.newProducer()
- .topic(topic)
- .enableBatching(false)
- .compressionType(CompressionType.LZ4)
- .create();
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("sub1")
- .subscribe();
- byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10];
- producer.send(content);
- assertEquals(consumer.receive().getValue(), content);
- producer.close();
- consumer.close();
-
- }
-
- /**
* Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
* EntryCache should be cleaned : Once active subscription consumes messages
*
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
index 4d2ca09..0be5aae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
@@ -92,7 +92,7 @@ class BatchMessageContainer {
sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = PooledByteBufAllocator.DEFAULT
- .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES), PulsarDecoder.MaxMessageSize);
+ .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES));
}
if (previousCallback != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a393b48..6dcaca0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -302,22 +302,21 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (!isBatchMessagingEnabled()) {
compressedPayload = compressor.encode(payload);
payload.release();
- }
- int compressedSize = compressedPayload.readableBytes();
- // validate msg-size (validate uncompressed-payload size for batch as we can't discard later on while building a
- // batch)
- if (compressedSize > PulsarDecoder.MaxMessageSize) {
- compressedPayload.release();
- String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
- ? "Compressed"
- : "";
- PulsarClientException.InvalidMessageException invalidMessageException =
- new PulsarClientException.InvalidMessageException(
- format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
- PulsarDecoder.MaxMessageSize));
- callback.sendComplete(invalidMessageException);
- return;
+ // validate msg-size (For batching this will be check at the batch completion size)
+ int compressedSize = compressedPayload.readableBytes();
+
+ if (compressedSize > PulsarDecoder.MaxMessageSize) {
+ compressedPayload.release();
+ String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
+ ? "Compressed"
+ : "";
+ PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException(
+ format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
+ PulsarDecoder.MaxMessageSize));
+ callback.sendComplete(invalidMessageException);
+ return;
+ }
}
if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
@@ -1286,12 +1285,23 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
ByteBuf compressedPayload = batchMessageContainer.getCompressedBatchMetadataAndPayload();
long sequenceId = batchMessageContainer.sequenceId;
ByteBuf encryptedPayload = encryptMessage(batchMessageContainer.messageMetadata, compressedPayload);
+
ByteBufPair cmd = sendMessage(producerId, sequenceId, batchMessageContainer.numMessagesInBatch,
batchMessageContainer.setBatchAndBuild(), encryptedPayload);
op = OpSendMsg.create(batchMessageContainer.messages, cmd, sequenceId,
batchMessageContainer.firstCallback);
+ if (encryptedPayload.readableBytes() > PulsarDecoder.MaxMessageSize) {
+ cmd.release();
+ semaphore.release(numMessagesInBatch);
+ if (op != null) {
+ op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " + PulsarDecoder.MaxMessageSize + " bytes"));
+ }
+ return;
+ }
+
op.setNumMessagesInBatch(batchMessageContainer.numMessagesInBatch);
op.setBatchSizeByte(batchMessageContainer.currentBatchSizeBytes);