You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Liquan Pei (JIRA)" <ji...@apache.org> on 2016/04/13 03:12:25 UTC

[jira] [Commented] (KAFKA-3552) New Consumer: java.lang.OutOfMemoryError: Direct buffer memory

    [ https://issues.apache.org/jira/browse/KAFKA-3552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15238372#comment-15238372 ] 

Liquan Pei commented on KAFKA-3552:
-----------------------------------

Hi Kanak,

Can you share with us the consumer configuration file? Also, it would be helpful to know how many topic partitions you are consuming from. Do you experience GC when this happens? It would be nice if you can provide us the GC log and heap dump? 

> New Consumer: java.lang.OutOfMemoryError: Direct buffer memory
> --------------------------------------------------------------
>
>                 Key: KAFKA-3552
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3552
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.1
>            Reporter: Kanak Biscuitwala
>            Assignee: Liquan Pei
>
> I'm running Kafka's new consumer with message handlers that can sometimes take a lot of time to return, and combining that with manual offset management (to get at-least-once semantics). Since poll() is the only way to heartbeat with the consumer, I have a thread that runs every 500 milliseconds that does the following:
> 1) Pause all partitions
> 2) Call poll(0)
> 3) Resume all partitions
> For the record, all accesses to KafkaConsumer are protected by synchronized blocks. This generally works, but I'm occasionally seeing messages like this:
> {code}
> java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:658)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>         at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>         at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
>         at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>         at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>         at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>         at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> {code}
> In addition, when I'm reporting offsets, I'm seeing:
> {code}
> java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:658)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>         at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>         at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
>         at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>         at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>         at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>         at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
> {code}
> Given that I'm just calling the library, this behavior is unexpected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)