You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martijn Visser <ma...@apache.org> on 2022/07/01 10:56:41 UTC

Re: Memory issues with KafkaSink exactly once

I'm looping in Qingsheng to get his opinion.

@Charles do you think https://issues.apache.org/jira/browse/FLINK-28250
matches with your situation too?

Op do 30 jun. 2022 om 18:56 schreef Charles Tan <ct...@gmail.com>:

> Hi Flink users,
>
> Last week I sent an email about memory issues I was experiencing when
> using Kafka Sink with exactly once delivery guarantees. For a simple kafka
> source --> kafka sink job where the kafka sink has exactly once guarantees
> enabled, the Taskmanager heap usage just grows until eventually the
> Taskmanager fails with “java.lang.OutOfMemoryError: Java heap space”. More
> details in the forwarded email. I wanted to raise this issue again this
> week to see if anyone has encountered something similar or has any ideas
> for addressing this issue.
>
> Thanks,
> Charles
>
> ---------- Forwarded message ---------
> From: Charles Tan <ct...@gmail.com>
> Date: Thu, Jun 23, 2022 at 8:40 PM
> Subject: Memory issues with KafkaSink exactly once
> To: <us...@flink.apache.org>
>
>
> Hi Flink users,
>
> I’m running into memory issues using KafkaSink with exactly once
> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
> classes like “org.apache.kafka.common.MetricName” and
> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
> same application but switch the delivery guarantees to none or at least
> once, the memory issues don’t occur. Am I missing some configurations for
> exactly once or is there a known issue with memory leaks related to exactly
> once kafka sink? Below I’ve provided some screenshots from analyzing heap
> dumps taken from my failing job, as well as code snippets from the job
> itself.
>
> These are pie charts displaying top classes for memory consumption, taken
> roughly 30 mins apart:
> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
> Job configuration:
>
>    - Flink 1.15.0
>    - scala version 2.12
>    - java version 11
>    - taskmanager.memory.process.size=1024m
>
> Code Snippet:
>
> public class FlinkTest {
>     public void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         KafkaSource<String> source = KafkaSource.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setTopics("input")
>             .setGroupId("my-group" + System.currentTimeMillis())
>             .setStartingOffsets(OffsetsInitializer.earliest())
>             .setValueOnlyDeserializer(new SimpleStringSchema())
>             .build();
>         DataStream<String> sourceStream = env.fromSource(
>             source,
>             WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>         KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
>             .setValueSerializationSchema(new SimpleStringSchema())
>             .setTopic("output")
>             .build();
>         Properties sinkProps = new Properties();
>         sinkProps.put("transaction.timeout.ms", 180000);
>         KafkaSink<String> sink = KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setKafkaProducerConfig(sinkProps)
>             .setRecordSerializer(serializer)
>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>             .build();
>         sourceStream.sinkTo(sink);
>         env.enableCheckpointing(10000);
>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>         env.execute("tester");
>     }
> }
>
> Thanks,
> Charles
>

Re: Memory issues with KafkaSink exactly once

Posted by Charles Tan <ct...@gmail.com>.
Hi Martijn,

Yes, that issue matches the behavior I'm seeing. I added the four lines of
code mentioned in the ticket and rebuilt Flink locally, then I used this
patched Flink to test my example and it didn't have any memory leaks. I
think it's probable that solving FLINK-28250 will also resolve my issues.

Thanks,
Charles

On Fri, Jul 1, 2022 at 3:56 AM Martijn Visser <ma...@apache.org>
wrote:

> I'm looping in Qingsheng to get his opinion.
>
> @Charles do you think https://issues.apache.org/jira/browse/FLINK-28250
> matches with your situation too?
>
> Op do 30 jun. 2022 om 18:56 schreef Charles Tan <ct...@gmail.com>:
>
>> Hi Flink users,
>>
>> Last week I sent an email about memory issues I was experiencing when
>> using Kafka Sink with exactly once delivery guarantees. For a simple kafka
>> source --> kafka sink job where the kafka sink has exactly once guarantees
>> enabled, the Taskmanager heap usage just grows until eventually the
>> Taskmanager fails with “java.lang.OutOfMemoryError: Java heap space”. More
>> details in the forwarded email. I wanted to raise this issue again this
>> week to see if anyone has encountered something similar or has any ideas
>> for addressing this issue.
>>
>> Thanks,
>> Charles
>>
>> ---------- Forwarded message ---------
>> From: Charles Tan <ct...@gmail.com>
>> Date: Thu, Jun 23, 2022 at 8:40 PM
>> Subject: Memory issues with KafkaSink exactly once
>> To: <us...@flink.apache.org>
>>
>>
>> Hi Flink users,
>>
>> I’m running into memory issues using KafkaSink with exactly once
>> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
>> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
>> classes like “org.apache.kafka.common.MetricName” and
>> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
>> same application but switch the delivery guarantees to none or at least
>> once, the memory issues don’t occur. Am I missing some configurations for
>> exactly once or is there a known issue with memory leaks related to exactly
>> once kafka sink? Below I’ve provided some screenshots from analyzing heap
>> dumps taken from my failing job, as well as code snippets from the job
>> itself.
>>
>> These are pie charts displaying top classes for memory consumption, taken
>> roughly 30 mins apart:
>> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
>> Job configuration:
>>
>>    - Flink 1.15.0
>>    - scala version 2.12
>>    - java version 11
>>    - taskmanager.memory.process.size=1024m
>>
>> Code Snippet:
>>
>> public class FlinkTest {
>>     public void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         KafkaSource<String> source = KafkaSource.<String>builder()
>>             .setBootstrapServers("localhost:9092")
>>             .setTopics("input")
>>             .setGroupId("my-group" + System.currentTimeMillis())
>>             .setStartingOffsets(OffsetsInitializer.earliest())
>>             .setValueOnlyDeserializer(new SimpleStringSchema())
>>             .build();
>>         DataStream<String> sourceStream = env.fromSource(
>>             source,
>>             WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>         KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
>>             .setValueSerializationSchema(new SimpleStringSchema())
>>             .setTopic("output")
>>             .build();
>>         Properties sinkProps = new Properties();
>>         sinkProps.put("transaction.timeout.ms", 180000);
>>         KafkaSink<String> sink = KafkaSink.<String>builder()
>>             .setBootstrapServers("localhost:9092")
>>             .setKafkaProducerConfig(sinkProps)
>>             .setRecordSerializer(serializer)
>>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>>             .build();
>>         sourceStream.sinkTo(sink);
>>         env.enableCheckpointing(10000);
>>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>>         env.execute("tester");
>>     }
>> }
>>
>> Thanks,
>> Charles
>>
>