You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Emanuel Velzi (Jira)" <ji...@apache.org> on 2022/01/27 16:18:00 UTC

[jira] [Updated] (KAFKA-13623) Memory leak when multiple poll

     [ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Emanuel Velzi updated KAFKA-13623:
----------------------------------
    Description: 
Hi, I'm experiencing a kind of memory leak with this simple consumer.

Some info before the code:
    - kafka-clients.version: I try with 2.4.1 and 2.8.1
I only set these properties:
    - bootstrap.servers: my-servers
    - group.id: my-group-id
    - auto.offset.reset: earliest
    - enable.auto.commit: false
    - heartbeat.interval.ms: 300

My topic has NUM_PARTITIONS=48 partitions:
{code:java}
public class Test {
    /* ... */
    public void start() {
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            startOne();
        }
    }
    public void startOne() {
        LOGGER.info("startOne");
        this.pool.submit(this::startConsumer;
    }
    public void startConsumer() {
        var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
        try {
            consumer.subscribe(Collections.singletonList(this.topic));
            consumer.poll(Duration.ofSeconds(30));
            throw new RuntimeException("Some kind of error");
        } catch (Exception e) {
            LOGGER.error("Error en pool");
        } finally {
            consumer.close();
        }
        scheduleNewConsumer();
    }
    private void scheduleNewConsumer() {
        scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
    }
}
{code}
 

In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. 
In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed.

I test some other strategies. For example:
 - no close the consumer, and reuse it with a seek(..)
 - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and consumer.subscribe(..);

In both cases the memory leak was slower, but it happened anyway.

Also I tried this:
{code:java}
public void startConsumer() {
        var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
        try {
            consumer.subscribe(Collections.singletonList(this.topic));
            // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
            throw new RuntimeException("Some kind of error");
        } catch (Exception e) {
            LOGGER.error("Error en pool");
        } finally {
            consumer.unsubscribe();
            consumer.subscribe(Collections.singletonList(this.topic));
        }
        scheduleNewConsumer();
    }{code}
 

I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.

Someone can help me with this please?

  was:
Hi, I'm experiencing a kind of memory leak with this simple consumer.

Some info before the code:
    - kafka-clients.version: I try with 2.4.1 and 2.8.1
I only set these properties:
    - bootstrap.servers: my-servers
    - group.id: my-group-id
    - auto.offset.reset: earliest
    - enable.auto.commit: false
    - heartbeat.interval.ms: 300

My topic has NUM_PARTITIONS=48 partitions:

 
{code:java}
public class Test {
    /* ... */
    public void start() {
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            startOne();
        }
    }
    public void startOne() {
        LOGGER.info("startOne");
        this.pool.submit(this::startConsumer;
    }
    public void startConsumer() {
        var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
        try {
            consumer.subscribe(Collections.singletonList(this.topic));
            consumer.poll(Duration.ofSeconds(30));
            throw new RuntimeException("Some kind of error");
        } catch (Exception e) {
            LOGGER.error("Error en pool");
        } finally {
            consumer.close();
        }
        scheduleNewConsumer();
    }
    private void scheduleNewConsumer() {
        scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
    }
}
{code}
 

In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. 
In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed.

I test some other strategies. For example:
- no close the consumer, and reuse it with a seek(..)
- no close the consumer, and reuse it doing:  consumer.unsubscribe(); and consumer.subscribe(..);

In both cases the memory leak was slower, but it happened anyway.

Also I tried this:
...

 
{code:java}
public void startConsumer() {
        var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
        try {
            consumer.subscribe(Collections.singletonList(this.topic));
            // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
            throw new RuntimeException("Some kind of error");
        } catch (Exception e) {
            LOGGER.error("Error en pool");
        } finally {
            consumer.unsubscribe();
            consumer.subscribe(Collections.singletonList(this.topic));
        }
        scheduleNewConsumer();
    }{code}
 

...

I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.


Someone can help me with this please?


> Memory leak when multiple poll
> ------------------------------
>
>                 Key: KAFKA-13623
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13623
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.4.1, 2.8.1
>            Reporter: Emanuel Velzi
>            Priority: Major
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error en pool");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
>  
> In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. 
> In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed.
> I test some other strategies. For example:
>  - no close the consumer, and reuse it with a seek(..)
>  - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error en pool");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
>  
> I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.
> Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)