You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/21 13:48:21 UTC

[GitHub] [pulsar] Jason918 opened a new pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Jason918 opened a new pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400


   
   ### Motivation
   
   This is part of the work for [PIP 74](https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits)
   We need dynamic update `currentReceiverQueue` to control client memory.
   
   ### Modifications
   
   Add getter and setter method for `ConsumerBase#maxReceiverQueueSize`.
   - For `ConsumerImpl`, we need update availablePermits together.
   - For `MultiTopicsConsumerImpl`, we need update inner consumers together and trigger paused consumers.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   
   This change added tests and can be verified as follows:
   
     - org.apache.pulsar.client.impl.DynamicReceiverQueueSizeTest
     - ConsumerImplTest#testMaxReceiverQueueSize
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
     
   - [x] `no-need-doc` 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 edited a comment on pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 edited a comment on pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#issuecomment-1050817392


   > I think we need to handle the `ZeroQueueConsumerImpl` better since that case is not eligible for dynamic queue resizing, at least based on the current class design.
   
   Good catch.
   
   > I think that our API should be consistent when possible, and since `receiverQueueSize` has been on the builder since 2018, I'd prefer to update the new setter method to `setReceiverQueueSize`
   
   Good point. Previously, I use `maxReceiverQueueSize` because `receiverQueueSize` can be regarded as the current size of `incomingMessages`, actually I think `receiverQueueCapacity` maybe a proper name than `receiverQueueSize`. So let's leave it be and not change the concept at this moment.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r813018907



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1610,6 +1607,16 @@ public void increaseAvailablePermits(int delta) {
         increaseAvailablePermits(cnx(), delta);
     }
 
+    public void setMaxReceiverQueueSize(int newSize) {

Review comment:
       Nit: I think we want to add the `@Override` annotation to this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r812931477



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -119,6 +122,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
         this.interceptors = interceptors;
+        MAX_RECEIVER_QUEUE_SIZE_UPDATER.set(this, receiverQueueSize);

Review comment:
       Should we remove line:108?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r814739648



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -721,9 +725,7 @@ public String toString() {
                 + '}';
     }
 
-    protected void setMaxReceiverQueueSize(int newSize) {
-        this.maxReceiverQueueSize = newSize;
-    }
+    protected abstract void setMaxReceiverQueueSize(int newSize);

Review comment:
       > I think we could add this method to the `Consumer` interface.
   
   Actually, let's not do this. One of the motivation of PIP-74 is to decommission receiver queue size by auto adjust it internally. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r812545471



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1610,6 +1607,16 @@ public void increaseAvailablePermits(int delta) {
         increaseAvailablePermits(cnx(), delta);
     }
 
+    public void setMaxReceiverQueueSize(int newSize) {

Review comment:
       Do you think it'd be worth verifying that the `newSize` is positive? I see that the previous implementation did not verify the input, but a non-positive value will completely halt the consumer. Although, it's possible that someone was using that as a feature to effectively "pause" the consumer. Maybe we need to leave it as is. Let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r815209200



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -721,9 +725,7 @@ public String toString() {
                 + '}';
     }
 
-    protected void setMaxReceiverQueueSize(int newSize) {
-        this.maxReceiverQueueSize = newSize;
-    }
+    protected abstract void setMaxReceiverQueueSize(int newSize);

Review comment:
       I see. I was basing my comment on the fact that the initial commit had `setMaxReceiverQueueSize` set as a public method in the `ConsumerImpl`. Assuming that it is meant to be `protected`, I agree that this method does not belong in the `Consumer` interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 edited a comment on pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 edited a comment on pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#issuecomment-1050817392


   > I think we need to handle the `ZeroQueueConsumerImpl` better since that case is not eligible for dynamic queue resizing, at least based on the current class design.
   
   Good catch.
   
   > I think that our API should be consistent when possible, and since `receiverQueueSize` has been on the builder since 2018, I'd prefer to update the new setter method to `setReceiverQueueSize`
   
   Good point. Previously, I use `maxReceiverQueueSize` because `receiverQueueSize` can be regarded as the current size of `incomingMessages`, actually I think `receiverQueueCapacity` maybe a proper name than `receiverQueueSize`. So let's leave it be and not change the concept at this moment.
   
   Yes, it's 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r814727187



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1610,6 +1607,16 @@ public void increaseAvailablePermits(int delta) {
         increaseAvailablePermits(cnx(), delta);
     }
 
+    public void setMaxReceiverQueueSize(int newSize) {

Review comment:
       > Do you think it'd be worth verifying that the `newSize` is positive? I see that the previous implementation did not verify the input, but a non-positive value will completely halt the consumer. Although, it's possible that someone was using that as a feature to effectively "pause" the consumer. Maybe we need to leave it as is. Let me know what you think.
   
   Added `checkArgument`. And we already have `pause` method, it's too tricky to do negative queue size, let's not encourage this.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1610,6 +1607,16 @@ public void increaseAvailablePermits(int delta) {
         increaseAvailablePermits(cnx(), delta);
     }
 
+    public void setMaxReceiverQueueSize(int newSize) {

Review comment:
       > Nit: I think we want to add the `@Override` annotation to this method.
   
   done
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r813032788



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -721,9 +725,7 @@ public String toString() {
                 + '}';
     }
 
-    protected void setMaxReceiverQueueSize(int newSize) {
-        this.maxReceiverQueueSize = newSize;
-    }
+    protected abstract void setMaxReceiverQueueSize(int newSize);

Review comment:
       I think we could add this method to the `Consumer` interface. That would allow us to make it a first class member of the API, which would also make it easier to document for end users via the Javadoc. The related `receiverQueueSize` is defined on the `ConsumerBuilder`, so there is already a precedent for this kind of method being on the top level interface. Let me know what you think.
   
   Additionally, note that this method is currently exposed to the `ZeroQueueConsumerImpl` and would result in the consumer sending flow control permits. I think we should throw a `NotImplemented` exception in that class since the whole point of that class is that the queue is size 0. Assuming we go in this direction, we'll need to update the `ConsumerBuilder#receiverQueueSize` javadoc to indicate that consumers with size 0 are not eligible for queue resizing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#issuecomment-1047457540


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#issuecomment-1050817392


   > I think we need to handle the `ZeroQueueConsumerImpl` better since that case is not eligible for dynamic queue resizing, at least based on the current class design.
   
   Good catch.
   
   > I think that our API should be consistent when possible, and since `receiverQueueSize` has been on the builder since 2018, I'd prefer to update the new setter method to `setReceiverQueueSize`
   
   Good point. Previously, I use `maxReceiverQueueSize` because `receiverQueueSize` can be regarded as the current size of `incommingQueue`, actually I think `receiverQueueCapacity` maybe a proper name than `receiverQueueSize`. So let's leave it be and not change the concept at this moment.
   
   Yes, it's 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#discussion_r814739648



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -721,9 +725,7 @@ public String toString() {
                 + '}';
     }
 
-    protected void setMaxReceiverQueueSize(int newSize) {
-        this.maxReceiverQueueSize = newSize;
-    }
+    protected abstract void setMaxReceiverQueueSize(int newSize);

Review comment:
       > I think we could add this method to the `Consumer` interface.
   
   +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400#issuecomment-1050812988


   > it looks seem will auto-scale the `queue size`
   
   Yes, it's exactly what PIP-74 want to do.
   
   > maybe we can change the `receiverQueueSize` to `initialReceiverQueueSize` and add a doc to explain it.
   
   It's actually would be the max value of auto-scaled receiver queue size.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #14400: [PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #14400:
URL: https://github.com/apache/pulsar/pull/14400


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org