You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Emanuel Velzi (Jira)" <ji...@apache.org> on 2022/02/15 17:31:00 UTC
[jira] [Comment Edited] (KAFKA-13623) Memory leak when multiple poll
[ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492748#comment-17492748 ]
Emanuel Velzi edited comment on KAFKA-13623 at 2/15/22, 5:30 PM:
-----------------------------------------------------------------
Hi, thanks for your reply.
* _Since you're not calling poll in the second example, it's expected that direct memory wouldn't be allocated._
Yes, I was just trying to prove that closing and creating a new consumer every time were not the problem here, and it's not :)
* _It may be the case that a full GC is needed for the direct memory to get cleaned up._
Can the GC clean the Direct Memory? I thought that GC has nothing to do on non-heap memory.
*Some extra details*
I'm using +Ubuntu 18.04.5 LTS+
This is de output of "free -m" before I run my app:
!image-2022-01-31-09-06-27-762.png|width=504,height=49!
And this is after my up is running:
!image-2022-02-15-14-29-59-371.png|width=499,height=48!
These're my {+}jvm args{+}:
{code:java}
-javaagent:newrelic/newrelic-agent-7.0.1.jar
-Xmx2000m
-Xms2000m
-XX:+UseG1GC
-XX:+AlwaysPreTouch{code}
This is the config for the consumer:
{code:java}
[pool-2-thread-45] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [my-bootstrap-servers]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-test-random-700675988
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = consumer-test-random-700675988
group.instance.id = null
heartbeat.interval.ms = 300
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
{code}
This is the New Relic graph about non-heap memory:
!image-2022-02-15-14-21-38-467.png|width=728,height=252!
And this is the last log before the app is killed is something like:
{code:java}
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887){code}
Please, consider that sometimes the app is not killed. That depends on the available memory in the host.
The non-heap memory seems to grow up to a limit, and then it can be there a long time..
If this is normal, *how can I calculate the max amount of non-heap memory needed?*
was (Author: JIRAUSER284300):
Hi, thanks for your reply.
* _Since you're not calling poll in the second example, it's expected that direct memory wouldn't be allocated._
Yes, I was just trying to prove that closing and creating a new consumer every time were not the problem here, and it's not :)
* _It may be the case that a full GC is needed for the direct memory to get cleaned up._
Can the GC clean the Direct Memory? I thought that GC has nothing to do on non-heap memory.
*Some extra details*
I'm using +Ubuntu 18.04.5 LTS+
This is de output of "free -m" before I run my app:
!image-2022-01-31-09-06-27-762.png|width=504,height=49!
These're my {+}jvm args{+}:
{code:java}
-javaagent:newrelic/newrelic-agent-7.0.1.jar
-Xmx2000m
-Xms2000m
-XX:+UseG1GC
-XX:+AlwaysPreTouch{code}
This is the config for the consumer:
{code:java}
[pool-2-thread-45] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [my-bootstrap-servers]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-test-random-700675988
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = consumer-test-random-700675988
group.instance.id = null
heartbeat.interval.ms = 300
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
{code}
This is the New Relic graph about non-heap memory:
!image-2022-02-15-14-21-38-467.png|width=728,height=252!
And this is the last log before the app is killed is something like:
{code:java}
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887){code}
Please, consider that sometimes the app is not killed. That depends on the available memory in the host.
The non-heap memory seems to grow up to a limit, and then it can be there a long time..
If this is normal, *how can I calculate the max amount of non-heap memory needed?*
> Memory leak when multiple poll
> ------------------------------
>
> Key: KAFKA-13623
> URL: https://issues.apache.org/jira/browse/KAFKA-13623
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.4.1, 2.8.1
> Reporter: Emanuel Velzi
> Priority: Major
> Attachments: image-2022-01-31-09-06-27-762.png, image-2022-02-15-14-21-38-467.png, image-2022-02-15-14-29-59-371.png
>
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
> - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
> - bootstrap.servers: my-servers
> - group.id: my-group-id
> - auto.offset.reset: earliest
> - enable.auto.commit: false
> - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
> /* ... */
> public void start() {
> for (int i = 0; i < NUM_PARTITIONS; i++) {
> startOne();
> }
> }
> public void startOne() {
> LOGGER.info("startOne");
> this.pool.submit(this::startConsumer;
> }
> public void startConsumer() {
> var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
> try {
> consumer.subscribe(Collections.singletonList(this.topic));
> consumer.poll(Duration.ofSeconds(30));
> throw new RuntimeException("Some kind of error");
> } catch (Exception e) {
> LOGGER.error("Error!");
> } finally {
> consumer.close();
> }
> scheduleNewConsumer();
> }
> private void scheduleNewConsumer() {
> scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
> }
> }
> {code}
>
> In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one.
> In that moment the Direct Memory used by de java process starts to grow up indefinitely, until the process is killed.
> I test some other strategies. For example:
> - no close the consumer, and reuse it with a seek(..)
> - no close the consumer, and reuse it doing: consumer.unsubscribe(); and consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer(Consumer consumer) {
> /*always using the same consumer*/
> try {
> consumer.subscribe(Collections.singletonList(this.topic));
> // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
> throw new RuntimeException("Some kind of error");
> } catch (Exception e) {
> LOGGER.error("Error!");
> } finally {
> consumer.unsubscribe();
> consumer.subscribe(Collections.singletonList(this.topic));
> }
> scheduleNewConsumer();
> }{code}
>
> I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.
> Someone can help me with this please?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)