You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Brett Rann (JIRA)" <ji...@apache.org> on 2017/11/09 00:35:00 UTC

[jira] [Comment Edited] (KAFKA-6185) Selector memory leak with high likelihood of OOM in case of down conversion

    [ https://issues.apache.org/jira/browse/KAFKA-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16244999#comment-16244999 ] 

Brett Rann edited comment on KAFKA-6185 at 11/9/17 12:34 AM:
-------------------------------------------------------------

Moved this to slack last night to make the most of time. Updating the ticket from our chats:

{quote}
1. Seems like compression is not being used. Is that correct?
2. Do you know what is the average message size?
{quote}

1. We for sure have very active producers compressing. And I know there are some that explicitly don't. We don't set this at a topic config.
2. Our busiest topic has a maximum of 2MB messages, but most are well below that. They'd well be on average higher than what is often normal for kafka. Other topics are more 'normal'.
On that test cluster in the last hour peak was 1750 messages at 1100KB.  So about 628 bytes.  The minimum was 370 and 140KB.  So about 380 bytes.

{quote}
One more question: no quotas are set, right?
{quote}

Right.

{quote}
Can you enable heap dump on OOM? It will help to track what objects are leaking and what is holding the reference. Thank you!
{quote}

Done. Will kick in next restart. Still checking status this morning but I'll restart one of them.

Also sent you a dump last night also for the affected broker.
{noformat}
jmap -dump:live,format=b,file=heap.bin <pid>
{noformat}

And added some metrics
{noformat}
MBean: kafka.network:type=RequestMetrics,name=RequestBytes,request=<apiKey>
MBean: kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request=<apiKey>
MBean: kafka.server:type=BrokerTopicMetrics,name=FetchMessageConversionsPerSec,topic=([-.\w]+)
MBean: kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec,topic=([-.\w]+)
{noformat}

Where <apiKey> was (Fetch|Produce)

Let me know when you would like us to build and test your patch. Thanks!


was (Author: brettrann):
Moved this to slack last night to make the most of time. Updating the ticket from our chats:

{quote}
1. Seems like compression is not being used. Is that correct?
2. Do you know what is the average message size?
{quote}

1. We for sure have very active producers compressing. And I know there are some that explicitly don't. We don't set this at a topic config.
2. Our busiest topic has a maximum of 2MB messages, but most are well below that. They'd well be on average higher than what is often normal for kafka. Other topics are more 'normal'.
On that test cluster in the last hour peak was 1750k messages at 1100KB.  So about 628 bytes.  The minimum was 370 and 140KB.  So about 380 bytes.

{quote}
One more question: no quotas are set, right?
{quote}

Right.

{quote}
Can you enable heap dump on OOM? It will help to track what objects are leaking and what is holding the reference. Thank you!
{quote}

Done. Will kick in next restart. Still checking status this morning but I'll restart one of them.

Also sent you a dump last night also for the affected broker.
{noformat}
jmap -dump:live,format=b,file=heap.bin <pid>
{noformat}

And added some metrics
{noformat}
MBean: kafka.network:type=RequestMetrics,name=RequestBytes,request=<apiKey>
MBean: kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request=<apiKey>
MBean: kafka.server:type=BrokerTopicMetrics,name=FetchMessageConversionsPerSec,topic=([-.\w]+)
MBean: kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec,topic=([-.\w]+)
{noformat}

Where <apiKey> was (Fetch|Produce)

Let me know when you would like us to build and test your patch. Thanks!

> Selector memory leak with high likelihood of OOM in case of down conversion
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6185
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6185
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>         Environment: Ubuntu 14.04.5 LTS
> 5 brokers: 1&2 on 1.0.0 3,4,5 on 0.11.0.1
> inter.broker.protocol.version=0.11.0.1
> log.message.format.version=0.11.0.1
> clients a mix of 0.9, 0.10, 0.11
>            Reporter: Brett Rann
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>              Labels: regression
>             Fix For: 1.0.1
>
>         Attachments: Kafka_Internals___Datadog.png, Kafka_Internals___Datadog.png
>
>
> We are testing 1.0.0 in a couple of environments.
> Both have about 5 brokers, with two 1.0.0 brokers and the rest 0.11.0.1 brokers.
> One is using on disk message format 0.9.0.1, the other 0.11.0.1
> we have 0.9, 0.10, and 0.11 clients connecting.
> The cluster on the 0.9.0.1 format is running fine for a week.
> But the cluster on the 0.11.0.1 format is consistently having memory issues, only on the two upgraded brokers running 1.0.0.
> The first occurrence of the error comes along with this stack trace
> {noformat}
> {"timestamp":"2017-11-06 14:22:32,402","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-7","message":"[KafkaApi-1] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.users,partitions=[{partition=0,fetch_offset=227537,max_bytes=11000000},{partition=4,fetch_offset=354468,max_bytes=11000000},{partition=5,fetch_offset=266524,max_bytes=11000000},{partition=8,fetch_offset=324562,max_bytes=11000000},{partition=10,fetch_offset=292931,max_bytes=11000000},{partition=12,fetch_offset=325718,max_bytes=11000000},{partition=15,fetch_offset=229036,max_bytes=11000000}]}]}"}
> java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>         at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
>         at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>         at scala.Option.map(Option.scala:146)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>         at scala.Option.flatMap(Option.scala:171)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>         at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>         at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>         at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>         at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
>         at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>         at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
> {noformat}
> And then after a few of those it settles into this kind of pattern
> {noformat}
> {"timestamp":"2017-11-06 15:06:48,114","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-1","message":"[KafkaApi-1] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.accounts,partitions=[{partition=4,fetch_offset=560631,max_bytes=11000000},{partition=8,fetch_offset=557589,max_bytes=11000000},{partition=12,fetch_offset=551712,max_bytes=11000000}]}]}"}
> java.lang.OutOfMemoryError: Java heap space
> {"timestamp":"2017-11-06 15:06:48,811","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-7","message":"[KafkaApi-1] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.accounts,partitions=[{partition=4,fetch_offset=560631,max_bytes=11000000},{partition=8,fetch_offset=557589,max_bytes=11000000},{partition=12,fetch_offset=551712,max_bytes=11000000}]}]}"}
> java.lang.OutOfMemoryError: Java heap space
> {noformat}
> I've attached the heap use graphs. It steadily increases to max at which time the error starts appearing.
> I've tripled the heap space for one of the 1.0.0 hosts to see what happens, and it similarly climbs to near 6, then similarly starts having java.lang.OutOfMemoryError errors. I've attached those heap space graphs also, where the line that starts climbing from 2gb was when it was restarted with 6gb heap. The out of memory error started right at the peak of the flatline.
> Here's a snippit from the broker logs: https://gist.github.com/brettrann/4bb8041e884a299b7b0b12645a04492d
> I've redacted some group names because I'd need to check with the teams about making them public. Let me know what more is needed and I can gather it. This is a test cluster and the problem appears reproducible easily enough. Happy to gather as much info as needed.
> Our config is: 
> {noformat}
> broker.id=2
> delete.topic.enable=true
> auto.create.topics.enable=false
> auto.leader.rebalance.enable=true
> inter.broker.protocol.version=0.11.0.1
> log.message.format.version=0.11.0.1
> group.max.session.timeout.ms = 300000
> port=9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> replica.fetch.max.bytes=10485760
> log.dirs=/data/kafka/logs
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> log.retention.hours=168
> offsets.retention.minutes=10080
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> log.cleaner.enable=true
> zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181/kafka
> zookeeper.connection.timeout.ms=6000
> {noformat}
> This was also reported attached to the end of this ticket https://issues.apache.org/jira/browse/KAFKA-6042 which is a broker lockup/FD issue, but a new ticket was requested.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)