You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2016/07/27 00:27:21 UTC

[jira] [Updated] (KAFKA-3428) Remove metadata sync bottleneck from mirrormaker's producer

     [ https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ismael Juma updated KAFKA-3428:
-------------------------------
    Fix Version/s:     (was: 0.10.0.1)
                   0.10.1.0

> Remove metadata sync bottleneck from mirrormaker's producer
> -----------------------------------------------------------
>
>                 Key: KAFKA-3428
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3428
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>             Fix For: 0.10.1.0
>
>
> Due to sync on the single producer, MM in a setup with 32 consumer threads could not send more than 
> 358k msg/sec hence not being able to saturate the NIC. Profiling showed the producer.send takes 0.080 ms in average, which explains the bottleneck of 358k msg/sec. The following explains the bottleneck in producer.send and suggests how to improve it.
> Current impl of MM relies on a single reducer. For EACH message, the producer.send() calls waitOnMetadata which runs the following synchronized method
> {code}
>         // add topic to metadata topic list if it is not there already.
>         if (!this.metadata.containsTopic(topic))
>             this.metadata.add(topic);
> {code}
> Although the code is mostly noop, since containsTopic is synchronized it becomes the bottleneck in MM.
> Profiling highlights this bottleneck:
> {code}
> 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
>   18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   13.8% - 9,056 ms org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
>   1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
>   2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
>   2.2% - 1,442 ms org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> {code}
> After replacing this bottleneck with a kind of noop, another run of the profiler shows that fetch is the next bottleneck:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy	 132 s (54 %)	n/a	n/a
> 	java.lang.Thread.run	 50,776 ms (21 %)	n/a	n/a
> 	org.apache.kafka.clients.Metadata.fetch	 20,881 ms (8 %)	n/a	n/a
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
> {code}
> however the fetch method does not need to be syncronized
> {code}
>     public synchronized Cluster fetch() {
>         return this.cluster;
>     }
> {code}
> removing sync from the fetch method shows that bottleneck is disappeared:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy	 249 s (78 %)	n/a	n/a
> 	org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel	 24,489 ms (7 %)	n/a	n/a
> 	org.xerial.snappy.SnappyNative.rawUncompress	 17,024 ms (5 %)	n/a	n/a
> 	org.apache.kafka.clients.producer.internals.RecordAccumulator.append	 13,817 ms (4 %)	n/a	n/a
>   4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
> {code}
> Internally we have applied a patch to remove this bottleneck. The patch does the following:
> 1. replace HashSet with a concurrent hash set
> 2. remove sync from containsTopic and fetch
> 3. pass a replica of topics to getClusterForCurrentTopics since this synchronized method access topics at two locations and topics being hanged in the middle might mess with the semantics.
> Any interest in applying this patch? Any alternative suggestions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)