You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Emanuel Velzi (Jira)" <ji...@apache.org> on 2022/01/27 16:17:00 UTC

[jira] [Created] (KAFKA-13623) Memory leak with multiple poll

Emanuel Velzi created KAFKA-13623:
-------------------------------------

             Summary: Memory leak with multiple poll 
                 Key: KAFKA-13623
                 URL: https://issues.apache.org/jira/browse/KAFKA-13623
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.8.1, 2.4.1
            Reporter: Emanuel Velzi


Hi, I'm experiencing a kind of memory leak with this simple consumer.

Some info before the code:
    - kafka-clients.version: I try with 2.4.1 and 2.8.1
I only set these properties:
    - bootstrap.servers: my-servers
    - group.id: my-group-id
    - auto.offset.reset: earliest
    - enable.auto.commit: false
    - heartbeat.interval.ms: 300

My topic has NUM_PARTITIONS=48 partitions:

 
{code:java}
public class Test {
    /* ... */
    public void start() {
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            startOne();
        }
    }
    public void startOne() {
        LOGGER.info("startOne");
        this.pool.submit(this::startConsumer;
    }
    public void startConsumer() {
        var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
        try {
            consumer.subscribe(Collections.singletonList(this.topic));
            consumer.poll(Duration.ofSeconds(30));
            throw new RuntimeException("Some kind of error");
        } catch (Exception e) {
            LOGGER.error("Error en pool");
        } finally {
            consumer.close();
        }
        scheduleNewConsumer();
    }
    private void scheduleNewConsumer() {
        scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
    }
}
{code}
 

In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. 
In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed.

I test some other strategies. For example:
- no close the consumer, and reuse it with a seek(..)
- no close the consumer, and reuse it doing:  consumer.unsubscribe(); and consumer.subscribe(..);

In both cases the memory leak was slower, but it happened anyway.

Also I tried this:
...

 
{code:java}
public void startConsumer() {
        var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
        try {
            consumer.subscribe(Collections.singletonList(this.topic));
            // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
            throw new RuntimeException("Some kind of error");
        } catch (Exception e) {
            LOGGER.error("Error en pool");
        } finally {
            consumer.unsubscribe();
            consumer.subscribe(Collections.singletonList(this.topic));
        }
        scheduleNewConsumer();
    }{code}
 

...

I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.


Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)