You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Kanak Biscuitwala (JIRA)" <ji...@apache.org> on 2016/04/13 23:22:25 UTC

[jira] [Updated] (KAFKA-3552) New Consumer: java.lang.OutOfMemoryError: Direct buffer memory

     [ https://issues.apache.org/jira/browse/KAFKA-3552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kanak Biscuitwala updated KAFKA-3552:
-------------------------------------
    Attachment: Screen Shot 2016-04-13 at 11.56.05 AM.png
                Screen Shot 2016-04-13 at 2.17.48 PM.png

> New Consumer: java.lang.OutOfMemoryError: Direct buffer memory
> --------------------------------------------------------------
>
>                 Key: KAFKA-3552
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3552
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.1
>            Reporter: Kanak Biscuitwala
>            Assignee: Liquan Pei
>         Attachments: Screen Shot 2016-04-13 at 11.56.05 AM.png, Screen Shot 2016-04-13 at 2.17.48 PM.png
>
>
> I'm running Kafka's new consumer with message handlers that can sometimes take a lot of time to return, and combining that with manual offset management (to get at-least-once semantics). Since poll() is the only way to heartbeat with the consumer, I have a thread that runs every 500 milliseconds that does the following:
> 1) Pause all partitions
> 2) Call poll(0)
> 3) Resume all partitions
> For the record, all accesses to KafkaConsumer are protected by synchronized blocks. This generally works, but I'm occasionally seeing messages like this:
> {code}
> java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:658)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>         at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>         at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
>         at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>         at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>         at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>         at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> {code}
> In addition, when I'm reporting offsets, I'm seeing:
> {code}
> java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:658)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>         at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>         at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
>         at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>         at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>         at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>         at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
> {code}
> Given that I'm just calling the library, this behavior is unexpected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)