You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martijn Visser <ma...@apache.org> on 2022/07/01 10:56:41 UTC
Re: Memory issues with KafkaSink exactly once
I'm looping in Qingsheng to get his opinion.
@Charles do you think https://issues.apache.org/jira/browse/FLINK-28250
matches with your situation too?
Op do 30 jun. 2022 om 18:56 schreef Charles Tan <ct...@gmail.com>:
> Hi Flink users,
>
> Last week I sent an email about memory issues I was experiencing when
> using Kafka Sink with exactly once delivery guarantees. For a simple kafka
> source --> kafka sink job where the kafka sink has exactly once guarantees
> enabled, the Taskmanager heap usage just grows until eventually the
> Taskmanager fails with “java.lang.OutOfMemoryError: Java heap space”. More
> details in the forwarded email. I wanted to raise this issue again this
> week to see if anyone has encountered something similar or has any ideas
> for addressing this issue.
>
> Thanks,
> Charles
>
> ---------- Forwarded message ---------
> From: Charles Tan <ct...@gmail.com>
> Date: Thu, Jun 23, 2022 at 8:40 PM
> Subject: Memory issues with KafkaSink exactly once
> To: <us...@flink.apache.org>
>
>
> Hi Flink users,
>
> I’m running into memory issues using KafkaSink with exactly once
> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
> classes like “org.apache.kafka.common.MetricName” and
> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
> same application but switch the delivery guarantees to none or at least
> once, the memory issues don’t occur. Am I missing some configurations for
> exactly once or is there a known issue with memory leaks related to exactly
> once kafka sink? Below I’ve provided some screenshots from analyzing heap
> dumps taken from my failing job, as well as code snippets from the job
> itself.
>
> These are pie charts displaying top classes for memory consumption, taken
> roughly 30 mins apart:
> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
> Job configuration:
>
> - Flink 1.15.0
> - scala version 2.12
> - java version 11
> - taskmanager.memory.process.size=1024m
>
> Code Snippet:
>
> public class FlinkTest {
> public void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers("localhost:9092")
> .setTopics("input")
> .setGroupId("my-group" + System.currentTimeMillis())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> DataStream<String> sourceStream = env.fromSource(
> source,
> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
> KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
> .setValueSerializationSchema(new SimpleStringSchema())
> .setTopic("output")
> .build();
> Properties sinkProps = new Properties();
> sinkProps.put("transaction.timeout.ms", 180000);
> KafkaSink<String> sink = KafkaSink.<String>builder()
> .setBootstrapServers("localhost:9092")
> .setKafkaProducerConfig(sinkProps)
> .setRecordSerializer(serializer)
> .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
> .build();
> sourceStream.sinkTo(sink);
> env.enableCheckpointing(10000);
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> env.execute("tester");
> }
> }
>
> Thanks,
> Charles
>
Re: Memory issues with KafkaSink exactly once
Posted by Charles Tan <ct...@gmail.com>.
Hi Martijn,
Yes, that issue matches the behavior I'm seeing. I added the four lines of
code mentioned in the ticket and rebuilt Flink locally, then I used this
patched Flink to test my example and it didn't have any memory leaks. I
think it's probable that solving FLINK-28250 will also resolve my issues.
Thanks,
Charles
On Fri, Jul 1, 2022 at 3:56 AM Martijn Visser <ma...@apache.org>
wrote:
> I'm looping in Qingsheng to get his opinion.
>
> @Charles do you think https://issues.apache.org/jira/browse/FLINK-28250
> matches with your situation too?
>
> Op do 30 jun. 2022 om 18:56 schreef Charles Tan <ct...@gmail.com>:
>
>> Hi Flink users,
>>
>> Last week I sent an email about memory issues I was experiencing when
>> using Kafka Sink with exactly once delivery guarantees. For a simple kafka
>> source --> kafka sink job where the kafka sink has exactly once guarantees
>> enabled, the Taskmanager heap usage just grows until eventually the
>> Taskmanager fails with “java.lang.OutOfMemoryError: Java heap space”. More
>> details in the forwarded email. I wanted to raise this issue again this
>> week to see if anyone has encountered something similar or has any ideas
>> for addressing this issue.
>>
>> Thanks,
>> Charles
>>
>> ---------- Forwarded message ---------
>> From: Charles Tan <ct...@gmail.com>
>> Date: Thu, Jun 23, 2022 at 8:40 PM
>> Subject: Memory issues with KafkaSink exactly once
>> To: <us...@flink.apache.org>
>>
>>
>> Hi Flink users,
>>
>> I’m running into memory issues using KafkaSink with exactly once
>> guarantees. My Flink job runs for ~2h30m before the Taskmanager fails with
>> “java.lang.OutOfMemoryError: Java heap space”. In particular, I’ve noticed
>> classes like “org.apache.kafka.common.MetricName” and
>> “org.apache.kafka.common.metrics.Sensor” growing over time. When I run the
>> same application but switch the delivery guarantees to none or at least
>> once, the memory issues don’t occur. Am I missing some configurations for
>> exactly once or is there a known issue with memory leaks related to exactly
>> once kafka sink? Below I’ve provided some screenshots from analyzing heap
>> dumps taken from my failing job, as well as code snippets from the job
>> itself.
>>
>> These are pie charts displaying top classes for memory consumption, taken
>> roughly 30 mins apart:
>> [image: Screen Shot 2022-06-23 at 8.23.25 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.36 PM.png]
>> [image: Screen Shot 2022-06-23 at 8.23.44 PM.png]
>> Job configuration:
>>
>> - Flink 1.15.0
>> - scala version 2.12
>> - java version 11
>> - taskmanager.memory.process.size=1024m
>>
>> Code Snippet:
>>
>> public class FlinkTest {
>> public void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> KafkaSource<String> source = KafkaSource.<String>builder()
>> .setBootstrapServers("localhost:9092")
>> .setTopics("input")
>> .setGroupId("my-group" + System.currentTimeMillis())
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build();
>> DataStream<String> sourceStream = env.fromSource(
>> source,
>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>> KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
>> .setValueSerializationSchema(new SimpleStringSchema())
>> .setTopic("output")
>> .build();
>> Properties sinkProps = new Properties();
>> sinkProps.put("transaction.timeout.ms", 180000);
>> KafkaSink<String> sink = KafkaSink.<String>builder()
>> .setBootstrapServers("localhost:9092")
>> .setKafkaProducerConfig(sinkProps)
>> .setRecordSerializer(serializer)
>> .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>> .build();
>> sourceStream.sinkTo(sink);
>> env.enableCheckpointing(10000);
>> env.getCheckpointConfig().setCheckpointTimeout(60000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> env.execute("tester");
>> }
>> }
>>
>> Thanks,
>> Charles
>>
>