You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "David Mao (Jira)" <ji...@apache.org> on 2022/01/30 14:41:00 UTC

[jira] [Comment Edited] (KAFKA-13623) Memory leak when multiple poll

    [ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484366#comment-17484366 ] 

David Mao edited comment on KAFKA-13623 at 1/30/22, 2:40 PM:
-------------------------------------------------------------

direct memory is allocated when java applications perform IO with heap byte buffers. Since you're not calling poll in the second example, it's expected that direct memory wouldn't be allocated.

It's a little weird that the direct memory allocation keeps growing, since the direct memory is tied to the lifetime of a thread. It may be the case that a full GC is needed for the direct memory to get cleaned up.

What is killing the process here, can you describe the application environment?

Likewise, what JVM args are you running?


was (Author: david.mao):
Can you add

*-XX:+HeapDumpOnOutOfMemoryError*

to the consumer app java args and upload the heap dump?

> Memory leak when multiple poll
> ------------------------------
>
>                 Key: KAFKA-13623
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13623
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.4.1, 2.8.1
>            Reporter: Emanuel Velzi
>            Priority: Major
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
>  
> In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. 
> In that moment the Direct Memory used by de java process starts to grow up indefinitely, until the process is killed.
> I test some other strategies. For example:
>  - no close the consumer, and reuse it with a seek(..)
>  - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer(Consumer consumer) {
>  /*always using the same consumer*/
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
>  
> I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.
> Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)