You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2017/10/30 12:46:00 UTC

[jira] [Resolved] (KAFKA-2062) Sync Producer, Variable Message Length, Multiple Threads = Direct memory overuse

     [ https://issues.apache.org/jira/browse/KAFKA-2062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Manikumar resolved KAFKA-2062.
------------------------------
    Resolution: Auto Closed

Closing inactive issue. The old producer is no longer supported, please upgrade to the Java producer whenever possible.

> Sync Producer, Variable Message Length, Multiple Threads = Direct memory overuse
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-2062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2062
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.1.1
>            Reporter: Michael Braun
>            Assignee: Jun Rao
>
> Using a synchronous producer with multiple threads each calling .send on the single producer object, each thread ends up maintaining a threadlocal direct memory buffer. In a case of messages where the sizing varies(for instance, 99% of messages are 1MB and 1% are 100MB), eventually the buffers seem to expand to this level for all the threads which can cause an out of memory - direct buffer memory error:
> java.lang.OutOfMemoryError: Direct buffer memory
>   at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_67]
>   at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[na:1.7.0_67]
>   at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) ~[na:1.7.0_67]
>   at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) ~[na:1.7.0_67]
>   at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[na:1.7.0_67]
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannel.java:493) ~[na:1.7.0_67]
>   at java.nio.channels.SocketChannel.write(SocketChannel.java:493) ~[na:1.7.0_67]
>   at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.network.Send$class.writeCompletely(Transmission.scala:75) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:92) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.SyncProducer.liftedTree$1(SyncProducer.scala:72) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:100) ~[kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DEfaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.10-0.8.1.1.jar:na]
>   at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) [scala-library-2.10.1.jar:na]
>   at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) [scala-library-2.10.1.jar:na]
>   at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) [scala-library-2.10.1.jar:na]
>   at scala.collection.mutable.HashTable$class.foreachEntry(HashTasble.scala:226) [scala-library-2.10.1.jar:na]
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) [scala-library-2.10.1.jar:na]
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) [scala-library-2.10.1.jar:na]
>   at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) [scala-library-2.10.1.jar:na]
>   at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DEfaultEventHandler.scala:100) [kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.10-0.8.1.1.jar:na]
>   at kafka.producer.Producer.send(Producer.scala:76) [kafka_2.10-0.8.1.1.jar:na]
>   at kafka.javaapi.producer.Producer.send(Producer.scala:33) [kafka_2.10-0.8.1.1.jar:na]
> <my call here>



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)