You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "huxihx (JIRA)" <ji...@apache.org> on 2017/11/06 07:14:00 UTC

[jira] [Comment Edited] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

    [ https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239981#comment-16239981 ] 

huxihx edited comment on KAFKA-6165 at 11/6/17 7:13 AM:
--------------------------------------------------------

[~ijuma] Do you think it's caused by the fact that `AbstractIndex.resize` does not free the off-heap memory before creating new mapped memory regions, as show below:
{code:title=AbstractIndex.scala|borderStyle=solid}
def resize(newSize: Int) {
    inLock(lock) {
      val raf = new RandomAccessFile(file, "rw")
      val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
      val position = mmap.position

      /* Windows won't let us modify the file length while the file is mmapped :-( */
      if (OperatingSystem.IS_WINDOWS)
        forceUnmap(mmap);
      try {
        raf.setLength(roundedNewSize)
        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
        _maxEntries = mmap.limit / entrySize
        mmap.position(position)
      } finally {
        CoreUtils.swallow(raf.close())
      }
    }
  }
{code}

Seems we only free the memory for Windows platform, do you think we should do the same as [KAFKA-4614|https://issues.apache.org/jira/browse/KAFKA-4614]? 


was (Author: huxi_2b):
[~ijuma] Do you think it's caused by the fact that `AbstractIndex.resize` does not free the off-heap memory before creating new mapped memory regions, as show below:
{code:title=AbstractIndex.java|borderStyle=solid}
def resize(newSize: Int) {
    inLock(lock) {
      val raf = new RandomAccessFile(file, "rw")
      val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
      val position = mmap.position

      /* Windows won't let us modify the file length while the file is mmapped :-( */
      if (OperatingSystem.IS_WINDOWS)
        forceUnmap(mmap);
      try {
        raf.setLength(roundedNewSize)
        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
        _maxEntries = mmap.limit / entrySize
        mmap.position(position)
      } finally {
        CoreUtils.swallow(raf.close())
      }
    }
  }
{code}

Seems we only free the memory for Windows platform, do you think we should do the same as [KAFKA-4614|https://issues.apache.org/jira/browse/KAFKA-4614]? 

> Kafka Brokers goes down with outOfMemoryError.
> ----------------------------------------------
>
>                 Key: KAFKA-6165
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6165
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>         Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>            Reporter: kaushik srinivas
>         Attachments: kafka_config.txt, stderr_broker1.txt, stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "1000012"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "50000000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "5000000000"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to unrecoverable I/O error while handling produce request:  (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 'store_sales-16'
> 	at kafka.log.Log.append(Log.scala:349)
> 	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
> 	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
> 	at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> 	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
> 	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
> 	at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
> 	at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
> 	at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
> 	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
> 	at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
> 	at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
> 	at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
> 	at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
> 	at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
> 	at kafka.log.Log.roll(Log.scala:771)
> 	at kafka.log.Log.maybeRoll(Log.scala:742)
> 	at kafka.log.Log.append(Log.scala:405)
> 	... 22 more
> Caused by: java.lang.OutOfMemoryError: Map failed
> 	at sun.nio.ch.FileChannelImpl.map0(Native Method)
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
> 	... 34 more
> 	
> Issue 2 :
> stack trace :
> [2017-11-02 23:55:49,602] FATAL [ReplicaFetcherThread-0-0], Disk error while replicating data for catalog_sales-3 (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaStorageException: I/O exception in append to log 'catalog_sales-3'
> 	at kafka.log.Log.append(Log.scala:349)
> 	at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
> 	at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
> 	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:159)
> 	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
> 	at scala.Option.foreach(Option.scala:257)
> 	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
> 	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
> 	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
> 	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
> 	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
> 	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.IOException: Map failed
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
> 	at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
> 	at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
> 	at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
> 	at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
> 	at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
> 	at kafka.log.Log.roll(Log.scala:771)
> 	at kafka.log.Log.maybeRoll(Log.scala:742)
> 	at kafka.log.Log.append(Log.scala:405)
> 	... 16 more
> Caused by: java.lang.OutOfMemoryError: Map failed
> 	at sun.nio.ch.FileChannelImpl.map0(Native Method)
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
> 	... 28 more
> 	
> These two exceptions are happening across all the 3 brokers continously with the same kafka configuration.
> Broker dies with these exceptions.
> Attached the log files for 2 issues of  two brokers.
> Also attached is the kafka configuration json data being used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)