You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/27 06:14:55 UTC
[GitHub] [pulsar] codelipenghui opened a new pull request, #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption
codelipenghui opened a new pull request, #16236:
URL: https://github.com/apache/pulsar/pull/16236
### Motivation
The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method with any schedule delay.
<img width="1845" alt="image" src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png">
<img width="1848" alt="image" src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png">
Profile result:
[perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt)
Running a performance test for single topic max message read rate test:
```
bin/pulsar-perf consume test -q 1000000 -p 100000000
bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2
```
Without this PR (2.10.1):
Profiling started
2022-06-27T13:44:01,183+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851 msg/s --- 2.027 Mbit/s --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057
2022-06-27T13:44:11,196+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125 msg/s --- 2.112 Mbit/s --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042
2022-06-27T13:44:21,216+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861 msg/s --- 1.598 Mbit/s --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548
2022-06-27T13:44:31,233+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932 msg/s --- 1.878 Mbit/s --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847
2022-06-27T13:44:41,247+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313 msg/s --- 1.791 Mbit/s --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286
With this PR:
Profiling started
2022-06-27T13:56:20,426+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516 msg/s --- 8.235 Mbit/s --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524
2022-06-27T13:56:30,438+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852 msg/s --- 7.634 Mbit/s --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55
2022-06-27T13:56:40,450+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040 msg/s --- 7.629 Mbit/s --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177
2022-06-27T13:56:50,462+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458 msg/s --- 7.624 Mbit/s --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52
2022-06-27T13:57:00,475+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584 msg/s --- 7.635 Mbit/s --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66
Profile result with this PR:
[perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt)
### Modification
- Change internal executor and external executor to normal executor service
- Added a new ScheduledExecutorProvider to handle the scheduled tasks.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption
Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r910598068
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java:
##########
@@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+ this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
Review Comment:
Thank you @codelipenghui.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Jason918 commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption
Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r907276620
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1378,10 +1379,11 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
// Lazy task scheduling to expire incomplete chunk message
if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
- internalPinnedExecutor
- .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
- expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
- TimeUnit.MILLISECONDS);
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
Review Comment:
`client.getScheduledExecutorProvider().getExecutor()` this gets a thread in roundrobin. But currently, some of the race conditions are avoided by using this single thread executor `internalPinnedExecutor`. Have you checked if this new scheduled executor thread would introduce some race conditions?
Or can we just wrap these tasks into `internalPinnedExecutor` again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption
Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r909625475
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java:
##########
@@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+ this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
Review Comment:
@michaeljmarshall Oh, I forgot this part. I will add this part.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codelipenghui merged pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption
Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16236:
URL: https://github.com/apache/pulsar/pull/16236
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption
Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r907336486
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1378,10 +1379,11 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
// Lazy task scheduling to expire incomplete chunk message
if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
- internalPinnedExecutor
- .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
- expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
- TimeUnit.MILLISECONDS);
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
Review Comment:
Make sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16236: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption
Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16236:
URL: https://github.com/apache/pulsar/pull/16236#discussion_r908049474
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java:
##########
@@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+ this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
Review Comment:
@codelipenghui - I think we might want to provide a way for end users (and brokers) to supply this `ScheduledExecutorProvider`. We do that in the broker for the `externalExecutorProvider` and the `internalExecutorProvider` on the lines above to limit the resources required for multiple clients that run within the broker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org