You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Emanuel Velzi (Jira)" <ji...@apache.org> on 2022/01/27 16:18:00 UTC

[jira] [Updated] (KAFKA-13623) Memory leak when multiple polls

     [ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Emanuel Velzi updated KAFKA-13623:
----------------------------------
    Summary: Memory leak when multiple polls  (was: Memory leak with multiple poll )

> Memory leak when multiple polls
> -------------------------------
>
>                 Key: KAFKA-13623
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13623
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.4.1, 2.8.1
>            Reporter: Emanuel Velzi
>            Priority: Major
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
>  
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error en pool");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
>  
> In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. 
> In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed.
> I test some other strategies. For example:
> - no close the consumer, and reuse it with a seek(..)
> - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> ...
>  
> {code:java}
> public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error en pool");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
>  
> ...
> I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.
> Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)