You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Emanuel Velzi (Jira)" <ji...@apache.org> on 2022/01/27 16:18:00 UTC
[jira] [Updated] (KAFKA-13623) Memory leak when multiple polls
[ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Emanuel Velzi updated KAFKA-13623:
----------------------------------
Summary: Memory leak when multiple polls (was: Memory leak with multiple poll )
> Memory leak when multiple polls
> -------------------------------
>
> Key: KAFKA-13623
> URL: https://issues.apache.org/jira/browse/KAFKA-13623
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.4.1, 2.8.1
> Reporter: Emanuel Velzi
> Priority: Major
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
> - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
> - bootstrap.servers: my-servers
> - group.id: my-group-id
> - auto.offset.reset: earliest
> - enable.auto.commit: false
> - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
>
> {code:java}
> public class Test {
> /* ... */
> public void start() {
> for (int i = 0; i < NUM_PARTITIONS; i++) {
> startOne();
> }
> }
> public void startOne() {
> LOGGER.info("startOne");
> this.pool.submit(this::startConsumer;
> }
> public void startConsumer() {
> var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
> try {
> consumer.subscribe(Collections.singletonList(this.topic));
> consumer.poll(Duration.ofSeconds(30));
> throw new RuntimeException("Some kind of error");
> } catch (Exception e) {
> LOGGER.error("Error en pool");
> } finally {
> consumer.close();
> }
> scheduleNewConsumer();
> }
> private void scheduleNewConsumer() {
> scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
> }
> }
> {code}
>
> In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one.
> In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed.
> I test some other strategies. For example:
> - no close the consumer, and reuse it with a seek(..)
> - no close the consumer, and reuse it doing: consumer.unsubscribe(); and consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> ...
>
> {code:java}
> public void startConsumer() {
> var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
> try {
> consumer.subscribe(Collections.singletonList(this.topic));
> // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
> throw new RuntimeException("Some kind of error");
> } catch (Exception e) {
> LOGGER.error("Error en pool");
> } finally {
> consumer.unsubscribe();
> consumer.subscribe(Collections.singletonList(this.topic));
> }
> scheduleNewConsumer();
> }{code}
>
> ...
> I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.
> Someone can help me with this please?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)