You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Charles Tan <ct...@gmail.com> on 2022/06/24 03:40:45 UTC

Memory issues with KafkaSink exactly once

Hi Flink users,

I’m running into memory issues using KafkaSink with exactly once
guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
“java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
classes like “org.apache.kafka.common.MetricName” and
“org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
same application but switch the delivery guarantees to none or at least
once, the memory issues don’t occur. Am I missing some configurations for
exactly once or is there a known issue with memory leaks related to exactly
once kafka sink? Below I’ve provided some screenshots from analyzing heap
dumps taken from my failing job, as well as code snippets from the job
itself.

These are pie charts displaying top classes for memory consumption, taken
roughly 30 mins apart:
[image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
[image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
[image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
Job configuration:

   - Flink 1.15.0
   - scala version 2.12
   - java version 11
   - taskmanager.memory.process.size=1024m

Code Snippet:

public class FlinkTest {
    public void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("input")
            .setGroupId("my-group" + System.currentTimeMillis())
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();
        DataStream<String> sourceStream = env.fromSource(
            source,
            WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
        KafkaRecordSerializationSchema<String> serializer =
KafkaRecordSerializationSchema.builder()
            .setValueSerializationSchema(new SimpleStringSchema())
            .setTopic("output")
            .build();
        Properties sinkProps = new Properties();
        sinkProps.put("transaction.timeout.ms", 180000);
        KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setKafkaProducerConfig(sinkProps)
            .setRecordSerializer(serializer)
            .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .build();
        sourceStream.sinkTo(sink);
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
        env.execute("tester");
    }
}

Thanks,
Charles

Re: Memory issues with KafkaSink exactly once

Posted by Charles Tan <ct...@gmail.com>.
Hi Martijn,

Yes, that issue matches the behavior I'm seeing. I added the four lines of
code mentioned in the ticket and rebuilt Flink locally, then I used this
patched Flink to test my example and it didn't have any memory leaks. I
think it's probable that solving FLINK-28250 will also resolve my issues.

Thanks,
Charles

On Fri, Jul 1, 2022 at 3:56 AM Martijn Visser <ma...@apache.org>
wrote:

> I'm looping in Qingsheng to get his opinion.
>
> @Charles do you think https://issues.apache.org/jira/browse/FLINK-28250
> matches with your situation too?
>
> Op do 30 jun. 2022 om 18:56 schreef Charles Tan <ct...@gmail.com>:
>
>> Hi Flink users,
>>
>> Last week I sent an email about memory issues I was experiencing when
>> using Kafka Sink with exactly once delivery guarantees. For a simple kafka
>> source --> kafka sink job where the kafka sink has exactly once guarantees
>> enabled, the Taskmanager heap usage just grows until eventually the
>> Taskmanager fails with “java.lang.OutOfMemoryError: Java heap space”. More
>> details in the forwarded email. I wanted to raise this issue again this
>> week to see if anyone has encountered something similar or has any ideas
>> for addressing this issue.
>>
>> Thanks,
>> Charles
>>
>> ---------- Forwarded message ---------
>> From: Charles Tan <ct...@gmail.com>
>> Date: Thu, Jun 23, 2022 at 8:40 PM
>> Subject: Memory issues with KafkaSink exactly once
>> To: <us...@flink.apache.org>
>>
>>
>> Hi Flink users,
>>
>> I’m running into memory issues using KafkaSink with exactly once
>> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
>> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
>> classes like “org.apache.kafka.common.MetricName” and
>> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
>> same application but switch the delivery guarantees to none or at least
>> once, the memory issues don’t occur. Am I missing some configurations for
>> exactly once or is there a known issue with memory leaks related to exactly
>> once kafka sink? Below I’ve provided some screenshots from analyzing heap
>> dumps taken from my failing job, as well as code snippets from the job
>> itself.
>>
>> These are pie charts displaying top classes for memory consumption, taken
>> roughly 30 mins apart:
>> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
>> Job configuration:
>>
>>    - Flink 1.15.0
>>    - scala version 2.12
>>    - java version 11
>>    - taskmanager.memory.process.size=1024m
>>
>> Code Snippet:
>>
>> public class FlinkTest {
>>     public void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         KafkaSource<String> source = KafkaSource.<String>builder()
>>             .setBootstrapServers("localhost:9092")
>>             .setTopics("input")
>>             .setGroupId("my-group" + System.currentTimeMillis())
>>             .setStartingOffsets(OffsetsInitializer.earliest())
>>             .setValueOnlyDeserializer(new SimpleStringSchema())
>>             .build();
>>         DataStream<String> sourceStream = env.fromSource(
>>             source,
>>             WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>         KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
>>             .setValueSerializationSchema(new SimpleStringSchema())
>>             .setTopic("output")
>>             .build();
>>         Properties sinkProps = new Properties();
>>         sinkProps.put("transaction.timeout.ms", 180000);
>>         KafkaSink<String> sink = KafkaSink.<String>builder()
>>             .setBootstrapServers("localhost:9092")
>>             .setKafkaProducerConfig(sinkProps)
>>             .setRecordSerializer(serializer)
>>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>>             .build();
>>         sourceStream.sinkTo(sink);
>>         env.enableCheckpointing(10000);
>>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>>         env.execute("tester");
>>     }
>> }
>>
>> Thanks,
>> Charles
>>
>

Re: Memory issues with KafkaSink exactly once

Posted by Martijn Visser <ma...@apache.org>.
I'm looping in Qingsheng to get his opinion.

@Charles do you think https://issues.apache.org/jira/browse/FLINK-28250
matches with your situation too?

Op do 30 jun. 2022 om 18:56 schreef Charles Tan <ct...@gmail.com>:

> Hi Flink users,
>
> Last week I sent an email about memory issues I was experiencing when
> using Kafka Sink with exactly once delivery guarantees. For a simple kafka
> source --> kafka sink job where the kafka sink has exactly once guarantees
> enabled, the Taskmanager heap usage just grows until eventually the
> Taskmanager fails with “java.lang.OutOfMemoryError: Java heap space”. More
> details in the forwarded email. I wanted to raise this issue again this
> week to see if anyone has encountered something similar or has any ideas
> for addressing this issue.
>
> Thanks,
> Charles
>
> ---------- Forwarded message ---------
> From: Charles Tan <ct...@gmail.com>
> Date: Thu, Jun 23, 2022 at 8:40 PM
> Subject: Memory issues with KafkaSink exactly once
> To: <us...@flink.apache.org>
>
>
> Hi Flink users,
>
> I’m running into memory issues using KafkaSink with exactly once
> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
> classes like “org.apache.kafka.common.MetricName” and
> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
> same application but switch the delivery guarantees to none or at least
> once, the memory issues don’t occur. Am I missing some configurations for
> exactly once or is there a known issue with memory leaks related to exactly
> once kafka sink? Below I’ve provided some screenshots from analyzing heap
> dumps taken from my failing job, as well as code snippets from the job
> itself.
>
> These are pie charts displaying top classes for memory consumption, taken
> roughly 30 mins apart:
> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
> Job configuration:
>
>    - Flink 1.15.0
>    - scala version 2.12
>    - java version 11
>    - taskmanager.memory.process.size=1024m
>
> Code Snippet:
>
> public class FlinkTest {
>     public void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         KafkaSource<String> source = KafkaSource.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setTopics("input")
>             .setGroupId("my-group" + System.currentTimeMillis())
>             .setStartingOffsets(OffsetsInitializer.earliest())
>             .setValueOnlyDeserializer(new SimpleStringSchema())
>             .build();
>         DataStream<String> sourceStream = env.fromSource(
>             source,
>             WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>         KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
>             .setValueSerializationSchema(new SimpleStringSchema())
>             .setTopic("output")
>             .build();
>         Properties sinkProps = new Properties();
>         sinkProps.put("transaction.timeout.ms", 180000);
>         KafkaSink<String> sink = KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setKafkaProducerConfig(sinkProps)
>             .setRecordSerializer(serializer)
>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>             .build();
>         sourceStream.sinkTo(sink);
>         env.enableCheckpointing(10000);
>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>         env.execute("tester");
>     }
> }
>
> Thanks,
> Charles
>

Fwd: Memory issues with KafkaSink exactly once

Posted by Charles Tan <ct...@gmail.com>.
Hi Flink users,

Last week I sent an email about memory issues I was experiencing when using
Kafka Sink with exactly once delivery guarantees. For a simple kafka source
--> kafka sink job where the kafka sink has exactly once guarantees
enabled, the Taskmanager heap usage just grows until eventually the
Taskmanager fails with “java.lang.OutOfMemoryError: Java heap space”. More
details in the forwarded email. I wanted to raise this issue again this
week to see if anyone has encountered something similar or has any ideas
for addressing this issue.

Thanks,
Charles

---------- Forwarded message ---------
From: Charles Tan <ct...@gmail.com>
Date: Thu, Jun 23, 2022 at 8:40 PM
Subject: Memory issues with KafkaSink exactly once
To: <us...@flink.apache.org>


Hi Flink users,

I’m running into memory issues using KafkaSink with exactly once
guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
“java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
classes like “org.apache.kafka.common.MetricName” and
“org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
same application but switch the delivery guarantees to none or at least
once, the memory issues don’t occur. Am I missing some configurations for
exactly once or is there a known issue with memory leaks related to exactly
once kafka sink? Below I’ve provided some screenshots from analyzing heap
dumps taken from my failing job, as well as code snippets from the job
itself.

These are pie charts displaying top classes for memory consumption, taken
roughly 30 mins apart:
[image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
[image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
[image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
Job configuration:

   - Flink 1.15.0
   - scala version 2.12
   - java version 11
   - taskmanager.memory.process.size=1024m

Code Snippet:

public class FlinkTest {
    public void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("input")
            .setGroupId("my-group" + System.currentTimeMillis())
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();
        DataStream<String> sourceStream = env.fromSource(
            source,
            WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
        KafkaRecordSerializationSchema<String> serializer =
KafkaRecordSerializationSchema.builder()
            .setValueSerializationSchema(new SimpleStringSchema())
            .setTopic("output")
            .build();
        Properties sinkProps = new Properties();
        sinkProps.put("transaction.timeout.ms", 180000);
        KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setKafkaProducerConfig(sinkProps)
            .setRecordSerializer(serializer)
            .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .build();
        sourceStream.sinkTo(sink);
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
        env.execute("tester");
    }
}

Thanks,
Charles

Re: Memory issues with KafkaSink exactly once

Posted by Charles Tan <ct...@gmail.com>.
Hi Alexis,

Thanks for the response. I just tried building the current iteration of
Flink version 1.15.1 (
https://github.com/apache/flink/tree/release-1.15.1-rc1), which includes a
fix to the issue you linked, and then running the example from above using
Flink 1.15.1. However, I'm still observing the same behavior with the
Taskmanager's heap size growing until the job eventually crashes. So it
seems that this issue is probably separate from FLINK-27487.

Best,
Charles

On Mon, Jun 27, 2022 at 1:33 AM Alexis Sarda-Espinosa <
sarda.espinosa@gmail.com> wrote:

> Hello,
>
> do you think this could be related to
> https://issues.apache.org/jira/browse/FLINK-27487 ?
>
> Regards,
> Alexis.
>
> Am Fr., 24. Juni 2022 um 05:41 Uhr schrieb Charles Tan <
> ctangulme@gmail.com>:
>
>> Hi Flink users,
>>
>> I’m running into memory issues using KafkaSink with exactly once
>> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
>> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
>> classes like “org.apache.kafka.common.MetricName” and
>> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
>> same application but switch the delivery guarantees to none or at least
>> once, the memory issues don’t occur. Am I missing some configurations for
>> exactly once or is there a known issue with memory leaks related to exactly
>> once kafka sink? Below I’ve provided some screenshots from analyzing heap
>> dumps taken from my failing job, as well as code snippets from the job
>> itself.
>>
>> These are pie charts displaying top classes for memory consumption, taken
>> roughly 30 mins apart:
>> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
>> Job configuration:
>>
>>    - Flink 1.15.0
>>    - scala version 2.12
>>    - java version 11
>>    - taskmanager.memory.process.size=1024m
>>
>> Code Snippet:
>>
>> public class FlinkTest {
>>     public void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         KafkaSource<String> source = KafkaSource.<String>builder()
>>             .setBootstrapServers("localhost:9092")
>>             .setTopics("input")
>>             .setGroupId("my-group" + System.currentTimeMillis())
>>             .setStartingOffsets(OffsetsInitializer.earliest())
>>             .setValueOnlyDeserializer(new SimpleStringSchema())
>>             .build();
>>         DataStream<String> sourceStream = env.fromSource(
>>             source,
>>             WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>         KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
>>             .setValueSerializationSchema(new SimpleStringSchema())
>>             .setTopic("output")
>>             .build();
>>         Properties sinkProps = new Properties();
>>         sinkProps.put("transaction.timeout.ms", 180000);
>>         KafkaSink<String> sink = KafkaSink.<String>builder()
>>             .setBootstrapServers("localhost:9092")
>>             .setKafkaProducerConfig(sinkProps)
>>             .setRecordSerializer(serializer)
>>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>>             .build();
>>         sourceStream.sinkTo(sink);
>>         env.enableCheckpointing(10000);
>>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>>         env.execute("tester");
>>     }
>> }
>>
>> Thanks,
>> Charles
>>
>

Re: Memory issues with KafkaSink exactly once

Posted by Alexis Sarda-Espinosa <sa...@gmail.com>.
Hello,

do you think this could be related to
https://issues.apache.org/jira/browse/FLINK-27487 ?

Regards,
Alexis.

Am Fr., 24. Juni 2022 um 05:41 Uhr schrieb Charles Tan <ctangulme@gmail.com
>:

> Hi Flink users,
>
> I’m running into memory issues using KafkaSink with exactly once
> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
> classes like “org.apache.kafka.common.MetricName” and
> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
> same application but switch the delivery guarantees to none or at least
> once, the memory issues don’t occur. Am I missing some configurations for
> exactly once or is there a known issue with memory leaks related to exactly
> once kafka sink? Below I’ve provided some screenshots from analyzing heap
> dumps taken from my failing job, as well as code snippets from the job
> itself.
>
> These are pie charts displaying top classes for memory consumption, taken
> roughly 30 mins apart:
> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
> Job configuration:
>
>    - Flink 1.15.0
>    - scala version 2.12
>    - java version 11
>    - taskmanager.memory.process.size=1024m
>
> Code Snippet:
>
> public class FlinkTest {
>     public void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         KafkaSource<String> source = KafkaSource.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setTopics("input")
>             .setGroupId("my-group" + System.currentTimeMillis())
>             .setStartingOffsets(OffsetsInitializer.earliest())
>             .setValueOnlyDeserializer(new SimpleStringSchema())
>             .build();
>         DataStream<String> sourceStream = env.fromSource(
>             source,
>             WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>         KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
>             .setValueSerializationSchema(new SimpleStringSchema())
>             .setTopic("output")
>             .build();
>         Properties sinkProps = new Properties();
>         sinkProps.put("transaction.timeout.ms", 180000);
>         KafkaSink<String> sink = KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setKafkaProducerConfig(sinkProps)
>             .setRecordSerializer(serializer)
>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>             .build();
>         sourceStream.sinkTo(sink);
>         env.enableCheckpointing(10000);
>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>         env.execute("tester");
>     }
> }
>
> Thanks,
> Charles
>