You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Badai Aqrandista (JIRA)" <ji...@apache.org> on 2019/06/17 01:20:00 UTC

[jira] [Created] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

Badai Aqrandista created KAFKA-8546:
---------------------------------------

             Summary: Call System#runFinalization to avoid memory leak caused by JDK-6293787
                 Key: KAFKA-8546
                 URL: https://issues.apache.org/jira/browse/KAFKA-8546
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.0.1
            Reporter: Badai Aqrandista


When a heavily used broker uses gzip compression on all topics, sometime you can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This is caused by memory leak caused by JDK-6293787 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is caused by JDK-4797189 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).

 

In summary, this is what happen:
 * Inflater class contains finalizer method.
 * Whenever a class with finalizer method is instantiated, a Finalizer object is created. 
 * GC finalizer thread is responsible to process all Finalizer objects.
 * If the rate of Finalizer object creation exceed the rate of GC finalizer thread ability to process it, Finalizer object number grows continuously, and eventually triggers full GC (because it is stored in Old Gen).

 

Following stack trace shows what happen when a process is frozen doing full GC:

 
{code:java}
kafka-request-handler-13  Runnable Thread ID: 79
  java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
  java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
  java.util.zip.InflaterInputStream.read(byte[], int, int) InflaterInputStream.java:152
  java.util.zip.GZIPInputStream.read(byte[], int, int) GZIPInputStream.java:117
  java.io.BufferedInputStream.fill() BufferedInputStream.java:246
  java.io.BufferedInputStream.read() BufferedInputStream.java:265
  java.io.DataInputStream.readByte() DataInputStream.java:265
  org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) ByteUtils.java:168
  org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, long, int, Long) DefaultRecord.java:292
  org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, int, Long) DefaultRecordBatch.java:264
  org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:563
  org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:532
  org.apache.kafka.common.record.DefaultRecordBatch.iterator() DefaultRecordBatch.java:327
  scala.collection.convert.Wrappers$JIterableWrapper.iterator() Wrappers.scala:54
  scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72
 scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
  kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch) LogValidator.scala:267
  kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object) LogValidator.scala:259
  scala.collection.Iterator$class.foreach(Iterator, Function1) Iterator.scala:891
  scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
  scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72
  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
  kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:259
  kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:70
  kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, LongRef, long) Log.scala:771
  kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
  kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
  kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
  kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
  kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
  kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
  kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
  kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
  kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257
  kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) Partition.scala:647
  kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) ReplicaManager.scala:745
  kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) ReplicaManager.scala:733
  scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234
  scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234
  scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) HashMap.scala:130
  scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) HashMap.scala:130
  scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) HashTable.scala:236
  scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
  scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130
  scala.collection.TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) TraversableLike.scala:234
  scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) Traversable.scala:104
  kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) ReplicaManager.scala:733
  kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, Function1, Option, Function1) ReplicaManager.scala:471
  kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) KafkaApis.scala:489
  kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113
  kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
  java.lang.Thread.run() Thread.java:748
{code}
 

And following screenshot shows that the process heap is full of Finalizer objects. 

!Screen Shot 2019-05-30 at 1.27.25 pm.png!

I think Kafka needs to manually call System$runFinalization method after closing GZip input stream to avoid this full GC. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)