You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/02 15:34:30 UTC

[GitHub] [pulsar] hangc0276 opened a new pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

hangc0276 opened a new pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862


   Fix #6854 
   
   ### Bug description
   The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking:
   ```
   protected boolean hasEnoughMessagesForBatchReceive() {
           if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
               return false;
           }
           return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
                   || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes());
       }
   ```
   
   ### Changes
   When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] psilos commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
psilos commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r419146461



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {

Review comment:
       can we also add unit tests with the desired behaviour ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] avimas commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
avimas commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r419105124



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {
+                BatchReceivePolicy batchReceivePolicy = conf.getBatchReceivePolicy();
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(batchReceivePolicy.getMaxNumBytes())
+                        .timeout((int)batchReceivePolicy.getTimeoutMs()/1000, TimeUnit.SECONDS)
+                        .build();
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;

Review comment:
       I completely agree that throwing an exception is a better choice 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] avimas commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
avimas commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r419102917



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {
+                BatchReceivePolicy batchReceivePolicy = conf.getBatchReceivePolicy();
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(batchReceivePolicy.getMaxNumBytes())
+                        .timeout((int)batchReceivePolicy.getTimeoutMs()/1000, TimeUnit.SECONDS)
+                        .build();
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;

Review comment:
       The above solution is correct but users can get confused seeing their batchRecieve policy configuration has changed 
   Why not changing the function hasEnoughMessagesForBatchReceive to return true when 
   incomingMessages.size() >= maxReceiverQueueSize ?
   
    protected boolean hasEnoughMessagesForBatchReceive() {
           if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
               return false;
           }
           return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
                   || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()) || (incomingMessages.size() >= maxReceiverQueueSize);
       }
   
   What do you think ?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r421544869



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {

Review comment:
       Thanks for your feedback, I have add the unit tests for all kinds of exception case, please take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] avimas commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
avimas commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r419102917



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {
+                BatchReceivePolicy batchReceivePolicy = conf.getBatchReceivePolicy();
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(batchReceivePolicy.getMaxNumBytes())
+                        .timeout((int)batchReceivePolicy.getTimeoutMs()/1000, TimeUnit.SECONDS)
+                        .build();
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;

Review comment:
       The above solution is correct but users can get confused to see their batchRecieve policy configuration has changed 
   Why not changing the function hasEnoughMessagesForBatchReceive to return true when 
   incomingMessages.size() >= maxReceiverQueueSize ?
   
    protected boolean hasEnoughMessagesForBatchReceive() {
           if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
               return false;
           }
           return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
                   || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()) || (incomingMessages.size() >= maxReceiverQueueSize);
       }
   
   What do you think ?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] avimas commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
avimas commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r419102917



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {
+                BatchReceivePolicy batchReceivePolicy = conf.getBatchReceivePolicy();
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(batchReceivePolicy.getMaxNumBytes())
+                        .timeout((int)batchReceivePolicy.getTimeoutMs()/1000, TimeUnit.SECONDS)
+                        .build();
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;

Review comment:
       The above solution is correct but it users can get confused to see their batchRecieve policy configuration has changed 
   Why not changing the function hasEnoughMessagesForBatchReceive to return true when 
   incomingMessages.size() >= maxReceiverQueueSize ?
   
    protected boolean hasEnoughMessagesForBatchReceive() {
           if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
               return false;
           }
           return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
                   || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()) || (incomingMessages.size() >= maxReceiverQueueSize);
       }
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#issuecomment-625289104


   @codelipenghui When batch receive triggered by timeout, it should trigger send permit to broker server in `MultiTopicsConsumerImpl`. Without the trigger, it will also stuck the consumer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#issuecomment-625622208


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#issuecomment-625049323


   @hangc0276 Jia has started a discussion in the email thread for cutting 2.5.2 and it's better to include this fix in 2.5.2. If you have time, please help take a look at the comments, so that we can merge this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#issuecomment-625287385


   > I think we should introduce an exception when the maxNumMessages greater than queue size when creating consumers. This will be more transparent to users.
   
   I have throw a log when reset the batch receive policy, please take a look, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#issuecomment-625088478


   > @hangc0276 Jia has started a discussion in the email thread for cutting 2.5.2 and it's better to include this fix in 2.5.2. If you have time, please help take a look at the comments, so that we can merge this PR.
   
   sorry,I will fix this pr soon.  When i add test case for the batchReceivePolicy, I found another bugs, and the consumer still stuck when reset the maxNumMessages to receiverQueueSize. I will fix it soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r419104478



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {
+                BatchReceivePolicy batchReceivePolicy = conf.getBatchReceivePolicy();
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(batchReceivePolicy.getMaxNumBytes())
+                        .timeout((int)batchReceivePolicy.getTimeoutMs()/1000, TimeUnit.SECONDS)
+                        .build();
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;

Review comment:
       This will cause batch message return to the user earlier right? This will introduce another problem, users will get the batch message before reaching `maxNumMessage`. It's better to tell users the receiver queue size needs to greater than the `maxNumMessage` of the batch receive policy. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #6862: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6862:
URL: https://github.com/apache/pulsar/pull/6862#discussion_r419065174



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) {
+                BatchReceivePolicy batchReceivePolicy = conf.getBatchReceivePolicy();
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(batchReceivePolicy.getMaxNumBytes())
+                        .timeout((int)batchReceivePolicy.getTimeoutMs()/1000, TimeUnit.SECONDS)
+                        .build();
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;

Review comment:
       We also need to add a check for `DEFAULT_POLICY` since users may specify the queue size that lower that `maxNumMessages` of the `DEFAULT_POLICY`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org