You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/01 08:34:27 UTC

[GitHub] [pulsar] eolivelli opened a new pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

eolivelli opened a new pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396


   ### Motivation
   
   if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema.
   There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing
   
   ### Modifications
   Ensure that compression is applied in the code path during the schema preparation.
   
   With enableBatching=true the compression is usually applied per batch, but in case of schema preparation the code flows down to a separate path and we have to enforce compression.
   
   ### Verifying this change
   
   This change added tests that failed without this fix
   
   ### Does this pull request potentially affect one of the following parts:
   The patch applied to producers that enable batching, compression and do not send an initial schema to the producer.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-770998554


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#discussion_r569110035



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -383,11 +394,12 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // If compression is enabled, we are compressing, otherwise it will simply use the same buffer
         ByteBuf compressedPayload = payload;
+        boolean compressed = false;
         // Batch will be compressed when closed
         // If a message has a delayed delivery time, we'll always send it individually
         if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {

Review comment:
       @eolivelli Is change to `!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime() || isSchemaReady` works? It's not a big problem, I just noticed the `compressed` passed to the following methods, if we can make the compression works here for the multiple schema producer, I think we can void pass the `compressed`.
   
   The change looks good to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-771438118


   @codelipenghui I will try to implement your idea.
   
   @aahmed-se and @merlimat you had already commented. What's your opinion?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#discussion_r568261578



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -365,6 +365,21 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {
+        if (conf.getCompressionType() != CompressionType.NONE) {

Review comment:
       This check is not necessary. The `CompressionCodecNone` will already mimic the same behavior with respect to the reference counting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-771480353


   @merlimat I have addressed your comment, please take a look


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-771479204


   @codelipenghui I have checked `isBatchMessagingEnabled()`, and in my opinion changing that method will have a big impact and I will have to make changes in lots of points in ProducerImpl.
   I think it is not safe to change `isBatchMessagingEnabled()`
   
   I don't like that names for variables but I did not find a better name and also I wanted to keep the patch as small as possible


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#discussion_r568382715



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -365,6 +365,21 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {
+        if (conf.getCompressionType() != CompressionType.NONE) {

Review comment:
       Sure. I will simplify this method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-770987800


   /pulsar-bot run-failure-tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#discussion_r569188428



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -383,11 +394,12 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // If compression is enabled, we are compressing, otherwise it will simply use the same buffer
         ByteBuf compressedPayload = payload;
+        boolean compressed = false;
         // Batch will be compressed when closed
         // If a message has a delayed delivery time, we'll always send it individually
         if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {

Review comment:
       I would prefer to keep the code simpler.
   it is already quite complicated.
   
   thanks @codelipenghui  for your review!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-770832790


   /pulsar-bot run-failure-tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#discussion_r568261578



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -365,6 +365,21 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {
+        if (conf.getCompressionType() != CompressionType.NONE) {

Review comment:
       This check is not necessary. The `CompressionCodecNone` will already mimic the same behavior with respect to the reference counting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-772303026


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#issuecomment-771438118






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9396: Compression must be applied during deferred schema preparation and enableBatching is enabled

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9396:
URL: https://github.com/apache/pulsar/pull/9396#discussion_r568382715



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -365,6 +365,21 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {
+        if (conf.getCompressionType() != CompressionType.NONE) {

Review comment:
       Sure. I will simplify this method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org