You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Emanuel Velzi (Jira)" <ji...@apache.org> on 2022/02/15 17:31:00 UTC

[jira] [Comment Edited] (KAFKA-13623) Memory leak when multiple poll

    [ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492748#comment-17492748 ] 

Emanuel Velzi edited comment on KAFKA-13623 at 2/15/22, 5:30 PM:
-----------------------------------------------------------------

Hi, thanks for your reply.
 * _Since you're not calling poll in the second example, it's expected that direct memory wouldn't be allocated._

Yes, I was just trying to prove that closing and creating a new consumer every time were not the problem here, and it's not :)
 * _It may be the case that a full GC is needed for the direct memory to get cleaned up._

Can the GC clean the Direct Memory? I thought that GC has nothing to do on non-heap memory.

 

*Some extra details*

I'm using +Ubuntu 18.04.5 LTS+

This is de output of "free -m" before I run my app:

!image-2022-01-31-09-06-27-762.png|width=504,height=49!

And this is after my up is running:

!image-2022-02-15-14-29-59-371.png|width=499,height=48!

These're my {+}jvm args{+}: 
{code:java}
   -javaagent:newrelic/newrelic-agent-7.0.1.jar
   -Xmx2000m
   -Xms2000m
   -XX:+UseG1GC
   -XX:+AlwaysPreTouch{code}
 

This is the config for the consumer:
{code:java}
[pool-2-thread-45] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-bootstrap-servers]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-random-700675988
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = consumer-test-random-700675988
    group.instance.id = null
    heartbeat.interval.ms = 300
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

{code}
 

This is the New Relic graph about non-heap memory:

!image-2022-02-15-14-21-38-467.png|width=728,height=252!

And this is the last log before the app is killed is something like:
{code:java}
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887){code}
Please, consider that sometimes the app is not killed. That depends on the available memory in the host. 

The non-heap memory seems to grow up to a limit, and then it can be there a long time.. 

If this is normal, *how can I calculate the max amount of non-heap memory needed?*


was (Author: JIRAUSER284300):
Hi, thanks for your reply.
 * _Since you're not calling poll in the second example, it's expected that direct memory wouldn't be allocated._

Yes, I was just trying to prove that closing and creating a new consumer every time were not the problem here, and it's not :)
 * _It may be the case that a full GC is needed for the direct memory to get cleaned up._

Can the GC clean the Direct Memory? I thought that GC has nothing to do on non-heap memory.

 

*Some extra details*

I'm using +Ubuntu 18.04.5 LTS+

This is de output of "free -m" before I run my app:

!image-2022-01-31-09-06-27-762.png|width=504,height=49!

These're my {+}jvm args{+}: 
{code:java}
   -javaagent:newrelic/newrelic-agent-7.0.1.jar
   -Xmx2000m
   -Xms2000m
   -XX:+UseG1GC
   -XX:+AlwaysPreTouch{code}
 

This is the config for the consumer:
{code:java}
[pool-2-thread-45] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-bootstrap-servers]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-random-700675988
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = consumer-test-random-700675988
    group.instance.id = null
    heartbeat.interval.ms = 300
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

{code}
 

This is the New Relic graph about non-heap memory:

!image-2022-02-15-14-21-38-467.png|width=728,height=252!

And this is the last log before the app is killed is something like:
{code:java}
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887){code}
Please, consider that sometimes the app is not killed. That depends on the available memory in the host. 

The non-heap memory seems to grow up to a limit, and then it can be there a long time.. 

If this is normal, *how can I calculate the max amount of non-heap memory needed?*

> Memory leak when multiple poll
> ------------------------------
>
>                 Key: KAFKA-13623
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13623
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.4.1, 2.8.1
>            Reporter: Emanuel Velzi
>            Priority: Major
>         Attachments: image-2022-01-31-09-06-27-762.png, image-2022-02-15-14-21-38-467.png, image-2022-02-15-14-29-59-371.png
>
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
>  
> In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. 
> In that moment the Direct Memory used by de java process starts to grow up indefinitely, until the process is killed.
> I test some other strategies. For example:
>  - no close the consumer, and reuse it with a seek(..)
>  - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer(Consumer consumer) {
>  /*always using the same consumer*/
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
>  
> I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.
> Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)