You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Badai Aqrandista (JIRA)" <ji...@apache.org> on 2019/06/17 01:20:00 UTC
[jira] [Created] (KAFKA-8546) Call System#runFinalization to avoid
memory leak caused by JDK-6293787
Badai Aqrandista created KAFKA-8546:
---------------------------------------
Summary: Call System#runFinalization to avoid memory leak caused by JDK-6293787
Key: KAFKA-8546
URL: https://issues.apache.org/jira/browse/KAFKA-8546
Project: Kafka
Issue Type: Bug
Affects Versions: 2.0.1
Reporter: Badai Aqrandista
When a heavily used broker uses gzip compression on all topics, sometime you can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This is caused by memory leak caused by JDK-6293787 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is caused by JDK-4797189 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
In summary, this is what happen:
* Inflater class contains finalizer method.
* Whenever a class with finalizer method is instantiated, a Finalizer object is created.
* GC finalizer thread is responsible to process all Finalizer objects.
* If the rate of Finalizer object creation exceed the rate of GC finalizer thread ability to process it, Finalizer object number grows continuously, and eventually triggers full GC (because it is stored in Old Gen).
Following stack trace shows what happen when a process is frozen doing full GC:
{code:java}
kafka-request-handler-13 Runnable Thread ID: 79
java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
java.util.zip.InflaterInputStream.read(byte[], int, int) InflaterInputStream.java:152
java.util.zip.GZIPInputStream.read(byte[], int, int) GZIPInputStream.java:117
java.io.BufferedInputStream.fill() BufferedInputStream.java:246
java.io.BufferedInputStream.read() BufferedInputStream.java:265
java.io.DataInputStream.readByte() DataInputStream.java:265
org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) ByteUtils.java:168
org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, long, int, Long) DefaultRecord.java:292
org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, int, Long) DefaultRecordBatch.java:264
org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:563
org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:532
org.apache.kafka.common.record.DefaultRecordBatch.iterator() DefaultRecordBatch.java:327
scala.collection.convert.Wrappers$JIterableWrapper.iterator() Wrappers.scala:54
scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72
scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch) LogValidator.scala:267
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object) LogValidator.scala:259
scala.collection.Iterator$class.foreach(Iterator, Function1) Iterator.scala:891
scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72
scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:259
kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:70
kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, LongRef, long) Log.scala:771
kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257
kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) Partition.scala:647
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) ReplicaManager.scala:745
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) ReplicaManager.scala:733
scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234
scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) HashMap.scala:130
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) HashMap.scala:130
scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) HashTable.scala:236
scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130
scala.collection.TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) TraversableLike.scala:234
scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) Traversable.scala:104
kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) ReplicaManager.scala:733
kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, Function1, Option, Function1) ReplicaManager.scala:471
kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) KafkaApis.scala:489
kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113
kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
java.lang.Thread.run() Thread.java:748
{code}
And following screenshot shows that the process heap is full of Finalizer objects.
!Screen Shot 2019-05-30 at 1.27.25 pm.png!
I think Kafka needs to manually call System$runFinalization method after closing GZip input stream to avoid this full GC.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)