You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/14 17:04:16 UTC

[pulsar] branch master updated: In Java allow messages that compress to <5mb to be sent with batching enabled (#3718)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b3fed9  In Java allow messages that compress to <5mb to be sent with batching enabled (#3718)
4b3fed9 is described below

commit 4b3fed9bd43f0df18bacf947bc9c073841dec2c5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 14 10:04:08 2019 -0700

    In Java allow messages that compress to <5mb to be sent with batching enabled (#3718)
    
    * In Java allow messages that compress to <5mb to be sent with batching enabled
    
    * Test is not anymore accurate since behavior changed and we're already testing in ProducerConsumerTest
---
 .../client/api/SimpleProducerConsumerTest.java     | 27 +++++---
 .../client/api/v1/V1_ProducerConsumerTest.java     | 75 ----------------------
 .../pulsar/client/impl/BatchMessageContainer.java  |  2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 40 +++++++-----
 4 files changed, 45 insertions(+), 99 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index d51c686..af9be10 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -651,9 +651,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
      * <pre>
      * send msg with size > MAX_SIZE (5 MB)
      * a. non-batch with compression: pass
-     * b. batch-msg with compression: fail
+     * b. batch-msg with compression: pass
      * c. non-batch w/o  compression: fail
      * d. non-batch with compression, consumer consume: pass
+     * e. batch-msg w/o compression: fail
      * </pre>
      *
      * @throws Exception
@@ -673,18 +674,13 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
         producer.close();
 
-        // (b) batch-msg
+        // (b) batch-msg with compression
         producer = pulsarClient.newProducer().topic(topic)
             .enableBatching(true)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .compressionType(CompressionType.LZ4)
             .create();
-        try {
-            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
-            fail("Should have thrown exception");
-        } catch (PulsarClientException.InvalidMessageException e) {
-            // OK
-        }
+        producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
         producer.close();
 
         // (c) non-batch msg without compression
@@ -712,6 +708,21 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         producer.send(content);
         assertEquals(consumer.receive().getData(), content);
         producer.close();
+
+        // (e) batch-msg w/o compression
+        producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(true)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.NONE)
+            .create();
+        try {
+            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+            fail("Should have thrown exception");
+        } catch (PulsarClientException.InvalidMessageException e) {
+            // OK
+        }
+        producer.close();
+
         consumer.close();
 
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index ab7163a..161c374 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -618,81 +618,6 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
     }
 
     /**
-     * Verifies non-batch message size being validated after performing compression while batch-messaging validates
-     * before compression of message
-     *
-     * <pre>
-     * send msg with size > MAX_SIZE (5 MB)
-     * a. non-batch with compression: pass
-     * b. batch-msg with compression: fail
-     * c. non-batch w/o  compression: fail
-     * d. non-batch with compression, consumer consume: pass
-     * </pre>
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testSendBigMessageSizeButCompressed() throws Exception {
-        log.info("-- Starting {} test --", methodName);
-
-        final String topic = "persistent://my-property/use/my-ns/bigMsg";
-
-        // (a) non-batch msg with compression
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(false)
-                .compressionType(CompressionType.LZ4)
-                .create();
-        producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
-        producer.close();
-
-        // (b) batch-msg
-        producer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(true)
-                .compressionType(CompressionType.LZ4)
-                .create();
-        try {
-            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
-            fail("Should have thrown exception");
-        } catch (PulsarClientException.InvalidMessageException e) {
-            // OK
-        }
-        producer.close();
-
-        // (c) non-batch msg without compression
-        producer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(false)
-                .compressionType(CompressionType.NONE)
-                .create();
-        try {
-            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
-            fail("Should have thrown exception");
-        } catch (PulsarClientException.InvalidMessageException e) {
-            // OK
-        }
-        producer.close();
-
-        // (d) non-batch msg with compression and try to consume message
-        producer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(false)
-                .compressionType(CompressionType.LZ4)
-                .create();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub1")
-                .subscribe();
-        byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10];
-        producer.send(content);
-        assertEquals(consumer.receive().getValue(), content);
-        producer.close();
-        consumer.close();
-
-    }
-
-    /**
      * Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
      * EntryCache should be cleaned : Once active subscription consumes messages
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
index 4d2ca09..0be5aae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
@@ -92,7 +92,7 @@ class BatchMessageContainer {
             sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
             this.firstCallback = callback;
             batchedMessageMetadataAndPayload = PooledByteBufAllocator.DEFAULT
-                    .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES), PulsarDecoder.MaxMessageSize);
+                    .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES));
         }
 
         if (previousCallback != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a393b48..6dcaca0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -302,22 +302,21 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (!isBatchMessagingEnabled()) {
             compressedPayload = compressor.encode(payload);
             payload.release();
-        }
-        int compressedSize = compressedPayload.readableBytes();
 
-        // validate msg-size (validate uncompressed-payload size for batch as we can't discard later on while building a
-        // batch)
-        if (compressedSize > PulsarDecoder.MaxMessageSize) {
-            compressedPayload.release();
-            String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
-                    ? "Compressed"
-                    : "";
-            PulsarClientException.InvalidMessageException invalidMessageException =
-                    new PulsarClientException.InvalidMessageException(
-                            format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
-                                    PulsarDecoder.MaxMessageSize));
-            callback.sendComplete(invalidMessageException);
-            return;
+            // validate msg-size (For batching this will be check at the batch completion size)
+            int compressedSize = compressedPayload.readableBytes();
+
+            if (compressedSize > PulsarDecoder.MaxMessageSize) {
+                compressedPayload.release();
+                String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
+                        ? "Compressed"
+                        : "";
+                PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException(
+                        format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
+                                PulsarDecoder.MaxMessageSize));
+                callback.sendComplete(invalidMessageException);
+                return;
+            }
         }
 
         if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
@@ -1286,12 +1285,23 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 ByteBuf compressedPayload = batchMessageContainer.getCompressedBatchMetadataAndPayload();
                 long sequenceId = batchMessageContainer.sequenceId;
                 ByteBuf encryptedPayload = encryptMessage(batchMessageContainer.messageMetadata, compressedPayload);
+
                 ByteBufPair cmd = sendMessage(producerId, sequenceId, batchMessageContainer.numMessagesInBatch,
                         batchMessageContainer.setBatchAndBuild(), encryptedPayload);
 
                 op = OpSendMsg.create(batchMessageContainer.messages, cmd, sequenceId,
                         batchMessageContainer.firstCallback);
 
+                if (encryptedPayload.readableBytes() > PulsarDecoder.MaxMessageSize) {
+                    cmd.release();
+                    semaphore.release(numMessagesInBatch);
+                    if (op != null) {
+                        op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+                                "Message size is bigger than " + PulsarDecoder.MaxMessageSize + " bytes"));
+                    }
+                    return;
+                }
+
                 op.setNumMessagesInBatch(batchMessageContainer.numMessagesInBatch);
                 op.setBatchSizeByte(batchMessageContainer.currentBatchSizeBytes);