You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Mayuresh Gharat (JIRA)" <ji...@apache.org> on 2018/10/25 06:18:00 UTC

[jira] [Created] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

Mayuresh Gharat created KAFKA-7548:
--------------------------------------

             Summary: KafkaConsumer should not throw away already fetched data for paused partitions.
                 Key: KAFKA-7548
                 URL: https://issues.apache.org/jira/browse/KAFKA-7548
             Project: Kafka
          Issue Type: Improvement
          Components: clients
            Reporter: Mayuresh Gharat
            Assignee: Mayuresh Gharat


In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka brokers that is buffered in completedFetch queue. Now if we pause a few partitions, it seems that in next call to poll we remove the completedFetches for those paused partitions. Normally, if an application is calling pause on topicPartitions, it is likely to return to those topicPartitions in near future and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data would improve the performance for stream applications like Samza. We ran a benchmark were we compared what is the throughput w.r.t to different values of maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.000067003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

*After fix (records consumed)*
|Number of Partitions
Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
(60.022204036 sec)|5846512
(60.0168916 sec)|
|2|8257390
(60.011121061 sec)|7776150
(60.01620875 sec)|5269557
(60.022581248 sec)|
|4|7938510
(60.011829002 sec)|7510140
(60.017571391 sec)|5213496
(60.000230139 sec)|
|6|7100970
(60.007220465 sec)|6382845
(60.038580526 sec)|4519645
(60.000048034 sec)|
|8|6799956 (60.001850171 sec)|6482421
(60.001997219 sec)|4383300 (60.00004836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
(60.000041961 sec)|4884693
(60.000042054 sec)|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)