You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/21 10:59:06 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request, #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

codelipenghui opened a new pull request, #16160:
URL: https://github.com/apache/pulsar/pull/16160

   ### Motivation
   
   The consumer will apply the default batch receive policy even if the user will not use the batch receive API.
   
   https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61
   
   This will consume lots of CPU if the client has many consumers (100k consumers)
   
   <img width="1846" alt="image" src="https://user-images.githubusercontent.com/12592133/174783886-a4da085d-2cb8-4153-ba96-19414be65502.png">
   
   [consumer-cpu-threads.html.txt](https://github.com/apache/pulsar/files/8948238/consumer-cpu-threads.html.txt)
   
   The Pulsar perf tool can also reproduce the problem if run the test with many consumers
   
   ### Modification
   
   If there is no pending batch receive operation for a consumer, no need to trigger the
   batch timeout task periodically. We can only start the timeout check after adding batch
   receive request to pending request queue.
   
   Remove the lock in MultiTopicsConsumerImpl as #10352 does
   
   ### Verification
   
   Added new test to verify the batch receive timeout task will not start if no batch
   receive request
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r1025384855


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -695,18 +688,15 @@ private ConsumerConfigurationData<T> getInternalConsumerConfig() {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        lock.writeLock().lock();
-        try {
+        internalPinnedExecutor.execute(() -> {
             CONSUMER_EPOCH.incrementAndGet(this);

Review Comment:
   use `internalPinnedExecutor` then `CONSUMER_EPOCH.incrementAndGet(this);` the epoch will not work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#issuecomment-1161602095

   @codelipenghui Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16160:
URL: https://github.com/apache/pulsar/pull/16160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r902874267


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -164,12 +164,14 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
+        initReceiverQueueSize();
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {

Review Comment:
   If I am reading this correctly, this method will get called with every call to `batchReceive` or `batchReceiveAsync`. That means we could end up with several concurrent timer tasks for a single consumer. The current solution only has one task at a time per consumer. Given that the `doPendingBatchReceiveTask` method schedules the next task based on the `diff` to the next `opBatchReceive` timeout, I think this method should only schedule a timer task when `batchReceiveTimeout` is `null`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r903240526


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -164,12 +164,14 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
+        initReceiverQueueSize();
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {

Review Comment:
   @michaeljmarshall Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r903238224


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -164,12 +164,14 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
+        initReceiverQueueSize();
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {

Review Comment:
   > If I am reading this correctly, this method will get called with every call to batchReceive or batchReceiveAsync.
   
   No, we only call this method if no messages in the receiver queue. 
   
   > I think this method should only schedule a timer task when batchReceiveTimeout is null.
   
   Yes, make sense. I think I missed this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r903325352


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -164,12 +164,14 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
-        if (batchReceivePolicy.getTimeoutMs() > 0) {
+        initReceiverQueueSize();
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {
+        if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {

Review Comment:
   My bad, `doPendingBatchReceiveTask` runs in `internalPinnedExecutor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r903259474


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -164,12 +164,14 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
-        if (batchReceivePolicy.getTimeoutMs() > 0) {
+        initReceiverQueueSize();
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {
+        if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {

Review Comment:
   Both `internalPinnedExecutor` and `client.timer()` threads changes `batchReceiveTimeout`, seems not thread safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r903277386


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -164,12 +164,14 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
-        if (batchReceivePolicy.getTimeoutMs() > 0) {
+        initReceiverQueueSize();
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {
+        if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {

Review Comment:
   The timer thread will not access the `batchReceiveTimeout`
   
   https://github.com/apache/pulsar/pull/16160/files#diff-7a1c869c4bbefdfc72fcee6074b4dfeeec318334d48d998c1cff5e09713b0834R951-R953
   
   But I found another place `closeConsumerTasks` will also access the `batchReceiveTimeout` but can happen on the IO thread, the user's thread, but only get the `batchReceiveTimeout`, will not change it.  And we have `volatile` for `batchReceiveTimeout`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16160: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16160:
URL: https://github.com/apache/pulsar/pull/16160#discussion_r903940695


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -164,12 +164,14 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
+        initReceiverQueueSize();
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org