You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Gao Fei (Jira)" <ji...@apache.org> on 2022/07/22 06:26:00 UTC

[jira] [Commented] (KAFKA-14088) KafkaChannel memory leak

    [ https://issues.apache.org/jira/browse/KAFKA-14088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569832#comment-17569832 ] 

Gao Fei commented on KAFKA-14088:
---------------------------------

Later, by looking at the kafka source code and combining the logs, it was found that kafka may have received a large number of abnormal packets. If it encounters some long connection data packets, these abnormal packets will be cached by kafka as normal data until the memory is not enough. However, the subsequent processing also found that the format of these data packets was incorrect and could not be processed.
Subsequent tests are performed through the nmap -p 9092 -T4 -A -v ip command tool, and the above-mentioned memory overflow problem will soon occur. The abnormal message that should be generated will cause kafka to quickly report the memory overflow. Later, the consulting customer found that the customer did use a vulnerability scanning tool to perform similar operations on site, and each operation caused kafka to crash. Can this be avoided by using SASL? When Kafka itself encounters such an abnormal message, can it detect an incorrect data format without having to cache a lot and close the connection directly?
The following is some log information of the error:
{code:java}
[2022-07-21 14:33:18,664] ERROR Exception while processing request from 177.177.113.129:6667-172.36.28.103:65440-406 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 27265
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'client_id': Error reading string of length 513, only 103 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
    at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:121)
    at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:844)
    at kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:840)
    at kafka.network.Processor$$Lambda$1000/0x0000000058005440.apply(Unknown Source)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at kafka.network.Processor.processCompletedReceives(SocketServer.scala:840)
    at kafka.network.Processor.run(SocketServer.scala:731)
    at java.lang.Thread.run(Thread.java:823)
[2022-07-21 14:33:18,727] ERROR Closing socket for 177.177.113.129:6667-172.36.28.103:30646-406 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Unknown API key -173
[2022-07-21 14:33:18,727] ERROR Exception while processing request from 177.177.113.129:6667-172.36.28.103:30646-406 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Unknown API key -173
[2022-07-21 14:39:56,995] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:703)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:128)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:830)
    at kafka.network.Processor.run(SocketServer.scala:730)
    at java.lang.Thread.run(Thread.java:823){code}

> KafkaChannel memory leak
> ------------------------
>
>                 Key: KAFKA-14088
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14088
>             Project: Kafka
>          Issue Type: Bug
>          Components: network
>    Affects Versions: 2.2.1
>         Environment: Current system environment:
> kafka version: 2.2.1
> openjdk(openj9): jdk1.8
> Heap memory: 6.4GB
> MaxDirectSize: 8GB
> Total number of topics: about 150+, each with about 3 partitions
>            Reporter: Gao Fei
>            Priority: Minor
>
> The kafka broker reports OutOfMemoryError: Java heap space and OutOfMemoryError: Direct buffer memory at the same time. Through the memory dump, it is found that the most occupied objects are KafkaChannel->NetworkReceive->HeapByteBuffer, there are about 4 such KafkaChannels, each about 1.5GB Around, and the total heap memory allocation is only 6.4GB.
> It's strange why a KafkaChannel occupies so much heap memory. Isn't each batch request slowly written to disk through the RequestHandler thread? Normally, this memory in KafkaChannel should be released continuously, but it is not released.
> I am curious why there is such a large HeapByteBuffer object in KafkaChannel? What does this object store? Shouldn't the socket communication here use a lot of direct memory? Instead, why a lot of heap memory is used, and why is it not released?
> The business data is not very large, the business data of each customer is different, and some customers have this OOM in the environment, and some customers with large business data do not appear OOM.
> java.lang.OutOfMemoryError: Direct buffer memory
>     at java.nio.Bits.reserveMemory(Bits.java:693)
>     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>     at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>     at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>     at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>     at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>     at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>     at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>     at kafka.network.Processor.poll(SocketServer.scala:863)
>     at kafka.network.Processor.run(SocketServer.scala:762)
>     at java.lang.Thread.run(Thread.java:745)
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at org.apache.kafka.common.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>     at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>     at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>     at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>     at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>     at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>     at kafka.network.Processor.poll(SocketServer.scala:863)
>     at kafka.network.Processor.run(SocketServer.scala:762)
>     at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)