You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lucas Bradstreet (Jira)" <ji...@apache.org> on 2019/10/16 04:57:00 UTC

[jira] [Created] (KAFKA-9048) Improve partition scalability in replica fetcher

Lucas Bradstreet created KAFKA-9048:
---------------------------------------

             Summary: Improve partition scalability in replica fetcher
                 Key: KAFKA-9048
                 URL: https://issues.apache.org/jira/browse/KAFKA-9048
             Project: Kafka
          Issue Type: Task
          Components: core
            Reporter: Lucas Bradstreet


https://issues.apache.org/jira/browse/KAFKA-9039 ([https://github.com/apache/kafka/pull/7443]) improves the performance of the replica fetcher (at both small and large numbers of partitions), but it does not improve its complexity or scalability in the number of partitions.

I took a profile using async-profiler for the 1000 partition JMH replica fetcher benchmark. The big remaining culprits are:
 * ~18% looking up logStartOffset
 * ~45% FetchSessionHandler$Builder.add
 * ~19% FetchSessionHandler$Builder.build

*Suggestions*
 #  The logStartOffset is looked up for every partition on each doWork pass. This requires a hashmap lookup even though the logStartOffset changes rarely. If the replica fetcher could be notified of updates to the logStartOffset, then we could reduce the overhead to a function of the number of updates to the logStartOffset instead of O(n) on each pass.
 #  The use of FetchSessionHandler means that we maintain a partitionStates hashmap in the replica fetcher, and a sessionPartitions hashmap in the FetchSessionHandler. On each incremental fetch session pass, we need to reconcile these two hashmaps to determine which partitions were added/updated and which partitions were removed. This reconciliation process is especially expensive, requiring multiple passes over the fetching partitions, and hashmap remove and puts for most partitions. The replica fetcher could be smarter by maintaining the fetch session *updated* hashmap containing FetchRequest.PartitionData(s) directly, as well as *removed* partitions list so that these do not need to be generated by reconciled on each fetch pass.
 #  maybeTruncate requires an O(n) pass over the elements in partitionStates even if there are no partitions in truncating state. If we can maintain some additional state about whether truncating partitions exist in partitionStates, or if we could separate these states into a separate data structure, we would not need to iterate across all partitions on every doWork pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is minor but will become more substantial as the number of partitions increases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)