You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Emanuel Velzi (Jira)" <ji...@apache.org> on 2022/01/27 16:17:00 UTC
[jira] [Created] (KAFKA-13623) Memory leak with multiple poll
Emanuel Velzi created KAFKA-13623:
-------------------------------------
Summary: Memory leak with multiple poll
Key: KAFKA-13623
URL: https://issues.apache.org/jira/browse/KAFKA-13623
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.8.1, 2.4.1
Reporter: Emanuel Velzi
Hi, I'm experiencing a kind of memory leak with this simple consumer.
Some info before the code:
- kafka-clients.version: I try with 2.4.1 and 2.8.1
I only set these properties:
- bootstrap.servers: my-servers
- group.id: my-group-id
- auto.offset.reset: earliest
- enable.auto.commit: false
- heartbeat.interval.ms: 300
My topic has NUM_PARTITIONS=48 partitions:
{code:java}
public class Test {
/* ... */
public void start() {
for (int i = 0; i < NUM_PARTITIONS; i++) {
startOne();
}
}
public void startOne() {
LOGGER.info("startOne");
this.pool.submit(this::startConsumer;
}
public void startConsumer() {
var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
try {
consumer.subscribe(Collections.singletonList(this.topic));
consumer.poll(Duration.ofSeconds(30));
throw new RuntimeException("Some kind of error");
} catch (Exception e) {
LOGGER.error("Error en pool");
} finally {
consumer.close();
}
scheduleNewConsumer();
}
private void scheduleNewConsumer() {
scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
}
}
{code}
In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one.
In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed.
I test some other strategies. For example:
- no close the consumer, and reuse it with a seek(..)
- no close the consumer, and reuse it doing: consumer.unsubscribe(); and consumer.subscribe(..);
In both cases the memory leak was slower, but it happened anyway.
Also I tried this:
...
{code:java}
public void startConsumer() {
var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer)
try {
consumer.subscribe(Collections.singletonList(this.topic));
// NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
throw new RuntimeException("Some kind of error");
} catch (Exception e) {
LOGGER.error("Error en pool");
} finally {
consumer.unsubscribe();
consumer.subscribe(Collections.singletonList(this.topic));
}
scheduleNewConsumer();
}{code}
...
I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself.
Someone can help me with this please?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)