You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/07/03 19:37:00 UTC

[jira] [Commented] (FLINK-6301) Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug

    [ https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072843#comment-16072843 ] 

ASF GitHub Bot commented on FLINK-6301:
---------------------------------------

Github user vidhu5269 commented on the issue:

    https://github.com/apache/flink/pull/4015
  
    Hi @tzulitai 
    
    Apologies for such a long delay. It took me quite a while to come back to this.
    
    I ran the updated connector on the cluster and didn't see any dependency conflicts. The job using it was reading from a gzipped avro topic and producing into two different topics: a text and an avro topic. Both the consumption and production worked as expected.
    
    It was done on a standalone cluster with 2 workers and 1 master with 8 slots/worker. The job was using 10 slots and was running on the both the workers. Each worker had 8 vCPUs and 8 GB of RAM. 
    
    With this job, I also verified that there wasn't any memory leak with the `kafka-clients` version change.
    
    Following is the diff between the master and version change commit from `mvn dependency:tree`:
    ```
    20c20
    < [INFO] +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile
    ---
    > [INFO] +- org.apache.kafka:kafka-clients:jar:0.10.2.0:compile
    27,28c27,28
    < [INFO] +- org.apache.kafka:kafka_2.11:jar:0.10.0.1:test
    < [INFO] |  +- com.101tec:zkclient:jar:0.8:test
    ---
    > [INFO] +- org.apache.kafka:kafka_2.11:jar:0.10.2.0:test
    > [INFO] |  +- net.sf.jopt-simple:jopt-simple:jar:5.0.3:test
    31,35c31,33
    < [INFO] |  +- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:test
    < [INFO] |  +- net.sf.jopt-simple:jopt-simple:jar:4.9:test
    < [INFO] |  \- org.apache.zookeeper:zookeeper:jar:3.4.6:provided
    < [INFO] |     +- jline:jline:jar:0.9.94:provided
    < [INFO] |     \- io.netty:netty:jar:3.7.0.Final:provided
    ---
    > [INFO] |  +- com.101tec:zkclient:jar:0.10:test
    > [INFO] |  +- org.apache.zookeeper:zookeeper:jar:3.4.6:provided
    > [INFO] |  \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:test
    70a69
    > [INFO] |  |  +- io.netty:netty:jar:3.8.0.Final:provided
    ```
    From what we can see here, apart from the new kafka-clients library, there are a few changes coming from its tests-jar as well.
    1. The tests-jar has new version of `net.sf.jopt-simple:jopt-simple` and `com.101tec:zkclient:jar:0.10` in `test` scope. 
    2. `jline:jline` is not there in the new version.
    3. `3.8.0-Final` of `io.netty:netty` is being pulled in (from `flakka-remote`) instead of `3.7.0-Final`. 
    
    Since the job and the unit tests worked as expected, I am assuming that these dependency changes are not breaking anything. Do tell me if I am missing something here.
    
    Although, these are not new dependencies but I still verified that `zkclient` and `netty` both have ASL 2.0 whereas, jopt-simple has `MIT` which is also compatible with ASL 2.0. So, we should be good as far as licensing goes.


> Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6301
>                 URL: https://issues.apache.org/jira/browse/FLINK-6301
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.0, 1.1.3, 1.1.4
>            Reporter: Rahul Yadav
>            Assignee: Rahul Yadav
>             Fix For: 1.2.2, 1.4.0
>
>         Attachments: jeprof.24611.1228.i1228.heap.svg, jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, POSTFIX.jeprof.14880.1944.i1944.heap.svg, POSTFIX.jeprof.14880.4129.i4129.heap.svg, POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, POSTFIX.jeprof.14880.9.i9.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8 vCPUs and 8 slots each. Each host has 16 GB of RAM.
> In our jobs, 
> # We are consuming gzip compressed messages from Kafka using *KafkaConnector09* and use *rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of malloc calls from the java process and attached are the profiles generated at various stages of the job. As we can see, apart from the os.malloc and rocksDB.allocateNewBlock, there are additional malloc calls coming from inflate() method of java.util.zip.inflater. These calls are innocuous as long as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running process to dump caller stack on the method call. Following is the stackTrace we got: 
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the Kafka consumer code, we found that RecordsIterator is not closing the compressor stream after use and hence, causing the memory leak:
> https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210
> https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the issue was fixed in 0.10.1.0 but not back-ported to previous versions.
> So, I would assume that we have to two paths from here: 
> 1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, update the Kafka-clients dependency:
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40
> 2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of 0.10.0.1.
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40
> Apart from the master, also back-port the changes to 1.2.x for Kafka connector 10 and all the 1.x dependencies for Kafka connector 09.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)