You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/27 06:14:55 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request, #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption

codelipenghui opened a new pull request, #16236:
URL: https://github.com/apache/pulsar/pull/16236

   ### Motivation
   
   The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method with any schedule delay.
   
   <img width="1845" alt="image" src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png">
   
   <img width="1848" alt="image" src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png">
   
   Profile result: 
   [perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt)
   
   Running a performance test for single topic max message read rate test:
   
   ```
   bin/pulsar-perf consume test -q 1000000 -p 100000000
   bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2
   ```
   
   Without this PR (2.10.1):
   
   Profiling started
   2022-06-27T13:44:01,183+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851  msg/s --- 2.027 Mbit/s  --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057
   2022-06-27T13:44:11,196+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125  msg/s --- 2.112 Mbit/s  --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042
   2022-06-27T13:44:21,216+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861  msg/s --- 1.598 Mbit/s  --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548
   2022-06-27T13:44:31,233+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932  msg/s --- 1.878 Mbit/s  --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847
   2022-06-27T13:44:41,247+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313  msg/s --- 1.791 Mbit/s  --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286
   
   With this PR:
   
   Profiling started
   2022-06-27T13:56:20,426+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516  msg/s --- 8.235 Mbit/s  --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524
   2022-06-27T13:56:30,438+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852  msg/s --- 7.634 Mbit/s  --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55
   2022-06-27T13:56:40,450+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040  msg/s --- 7.629 Mbit/s  --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177
   2022-06-27T13:56:50,462+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458  msg/s --- 7.624 Mbit/s  --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52
   2022-06-27T13:57:00,475+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584  msg/s --- 7.635 Mbit/s  --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66
   
   Profile result with this PR:
   
   [perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt)
   
   ### Modification
   
   - Change internal executor and external executor to normal executor service
   - Added a new ScheduledExecutorProvider to handle the scheduled tasks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r910598068


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java:
##########
@@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
                     new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),

Review Comment:
   Thank you @codelipenghui.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r907276620


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1378,10 +1379,11 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            internalPinnedExecutor
-                    .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
-                            expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
-                            TimeUnit.MILLISECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(

Review Comment:
   
   `client.getScheduledExecutorProvider().getExecutor()` this gets a thread in roundrobin. But currently, some of the race conditions are avoided by using this single thread executor `internalPinnedExecutor`. Have you checked if this new scheduled executor thread would introduce some race conditions? 
   
   Or can we just wrap these tasks into `internalPinnedExecutor` again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r909625475


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java:
##########
@@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
                     new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),

Review Comment:
   @michaeljmarshall Oh, I forgot this part. I will add this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16236:
URL: https://github.com/apache/pulsar/pull/16236


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r907336486


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1378,10 +1379,11 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            internalPinnedExecutor
-                    .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
-                            expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
-                            TimeUnit.MILLISECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(

Review Comment:
   Make sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r908049474


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java:
##########
@@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
                     new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),

Review Comment:
   @codelipenghui - I think we might want to provide a way for end users (and brokers) to supply this `ScheduledExecutorProvider`. We do that in the broker for the `externalExecutorProvider` and the `internalExecutorProvider` on the lines above to limit the resources required for multiple clients that run within the broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org