You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Terry Heathcote <te...@gmail.com> on 2021/12/21 18:53:08 UTC

Flink Checkpoint Duration/Job Throughput

Hi

We are having trouble with record throughput that we believe to be a result
of slow checkpoint durations. The job uses Kafka as both a source and sink
as well as a Redis-backed service within the same cluster, used to enrich
the data in a transformation, before writing records back to Kafka. Below
is a description of the job:

   - Flink Version 12.5
   - Source topic = 24 partitions.
   - Multiple sink topics.
   - Parallelism set to 24.
   - Operators applied are a map function and process function to fetch the
   Redis data.
   - EXACTLY_ONCE processing is required.

We have switched between aligned and unaligned checkpoints but with no
improvement in performance. What we have witnessed is that on average the
majority of operators and their respective subtasks acknowledge checkpoints
within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before
acknowledging the checkpoint. Also, the subtask load seems skewed after
applying transformations prior to the sinks (tried to rebalance and shuffle
here but with no improvement). Checkpoint duration can vary between 5s and
7 minutes.

We believe this is slowing our overall job throughput as Kafka transaction
commits are delayed by slower checkpointing, creating upstream
backpressure, and a buildup on the source Kafka topic offsets. We would
ideally like to decrease the checkpoint interval once durations are low and
stable.

Any help on this would be greatly appreciated.

Best regards
Terry
ᐧ

Re: Flink Checkpoint Duration/Job Throughput

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

I see that there is no keyBy in your user code. Is it the case that some
Kafka partitions contain a lot more data than others? If so, you can try
datastream.rebalance() [1] to rebalance the data between each parallelism
and reduce the impact of data skew.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#rebalance--

Terry Heathcote <te...@gmail.com> 于2021年12月22日周三 17:00写道:

> Hi Caizhi
>
> Thank you for the response. Below is relevant code for the pipeline as
> requested, along with the Kafka properties we set for both the
> FlinkKafkaProducer and Consumer. The operator that suffers the data skew
> are the sinks.
>
> import Models.BlueprintCacheDataType;
> import PipelineBlocks.*;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.*;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.jetbrains.annotations.Nullable;
> import org.json.JSONObject;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import java.time.Duration;
>
> import org.apache.flink.streaming.api.datastream.
> SingleOutputStreamOperator;
>
> import java.nio.charset.StandardCharsets;
> import java.util.ArrayList;
> import java.util.List;
>
>
> public class IngestionStreamer {
> private StreamExecutionEnvironment env;
> private KafkaConfig config;
> private String commanderUrl;
> private static final Logger LOG = LoggerFactory.getLogger(
> IngestionStreamer.class);
>
> public IngestionStreamer(StreamExecutionEnvironment env, KafkaConfig
> config, String commanderUrl) {
> this.env = env;
> this.config = config;
> this.commanderUrl = commanderUrl;
> }
>
> public void StartStreamer(String topic) {
>
> FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
> new SimpleStringSchema(), config.kafkaPropertiesConsumer);
> DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
>
> DataStream<BlueprintCacheDataType> result = kafkaStream
> .map(new KeySorter()).uid("Mapper")
> .process(new FetchBlueprintInfoKeyed(commanderUrl))uid("BP_Fetch");
>
> DataStream<ProducerRecord<byte[], byte[]>> blueprints = result.process(new
> MapToBlueprints()).uid("Blueprint map");
> DataStream<ProducerRecord<byte[], byte[]>> swamp = result.process(new
> ToSwamp()).uid("Swamp map");
>
> blueprints.addSink(new FlinkKafkaProducer<>(
> "default-blueprint",
> new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() {
> @Override
> public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[],
> byte[]> producerRecord, @Nullable Long aLong) {
> return new ProducerRecord<>(producerRecord.topic(), producerRecord.value
> ());
> }
> },
> config.kafkaPropertiesProducer,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE
> )).name("Blueprint sink").uid("BP_Sink");
>
> swamp.addSink(new FlinkKafkaProducer<>(
> "default-swamp",
> new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() {
> @Override
> public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[],
> byte[]> producerRecord, @Nullable Long aLong) {
> return new ProducerRecord<>(producerRecord.topic(), producerRecord.value
> ());
> }
> },
> config.kafkaPropertiesProducer,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE
> )).name("Swamp sink").uid("Swamp_Sink");
>
>
> }
>
> }
>
>
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> import java.util.Properties;
>
> public class KafkaConfig {
>
> public Properties kafkaPropertiesConsumer;
> public Properties kafkaPropertiesProducer;
>
> public KafkaConfig(Main.Environment env) {
> kafkaPropertiesConsumer = addKafkaConsumerProperties(KafkaProperties
> (env));
> kafkaPropertiesProducer = addKafkaProducerProperties(KafkaProperties
> (env));
> }
>
> public Properties KafkaProperties(Main.Environment env) {
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers",
> "kafka-0-external:9094,kafka-1-external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4-external:9094"
> );
> properties.setProperty("group.id", System.getenv("KAFKA_GROUP_ID"));
> properties.setProperty("security.protocol", "SASL_PLAINTEXT");
> properties.setProperty("auto.commit.interval.ms", "1000");
> properties.setProperty("sasl.mechanism", "PLAIN");
> properties.setProperty("auto.offset.reset", "earliest");
> properties.setProperty("session.timeout.ms", "300000");
> properties.setProperty("default.api.timeout.ms", "300000");
> properties.setProperty("request.timeout.ms", "300000");
> properties.setProperty("flink.partition-discovery.interval-millis",
> "50000");
> return properties;
> }
>
> public Properties addKafkaConsumerProperties(Properties properties) {
> properties.setProperty("key.deserializer", StringDeserializer.class.
> getName());
> properties.setProperty("value.deserializer", StringDeserializer.class.
> getName());
> properties.setProperty("max.poll.interval.ms", "300000");
> return properties;
> }
>
> public Properties addKafkaProducerProperties(Properties properties) {
> properties.setProperty("transaction.timeout.ms", "900000");
> properties.setProperty("max.block.ms", "600000");
> properties.setProperty("delivery.timeout.ms", "300000");
> return properties;
> }
> }
>
>
> Best regards
> Terry
> ᐧ
>
> On Wed, Dec 22, 2021 at 3:50 AM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> From your description this is due to data skew. The common solution to
>> data skew is to add a random value to your partition keys so that data can
>> be distributed evenly into downstream operators.
>>
>> Could you provide more information about your job (preferably user code
>> or SQL code), especially the operator that suffers from data skew?
>>
>> Terry Heathcote <te...@gmail.com> 于2021年12月22日周三 02:53写道:
>>
>>> Hi
>>>
>>> We are having trouble with record throughput that we believe to be a
>>> result of slow checkpoint durations. The job uses Kafka as both a source
>>> and sink as well as a Redis-backed service within the same cluster, used to
>>> enrich the data in a transformation, before writing records back to Kafka.
>>> Below is a description of the job:
>>>
>>>    - Flink Version 12.5
>>>    - Source topic = 24 partitions.
>>>    - Multiple sink topics.
>>>    - Parallelism set to 24.
>>>    - Operators applied are a map function and process function to fetch
>>>    the Redis data.
>>>    - EXACTLY_ONCE processing is required.
>>>
>>> We have switched between aligned and unaligned checkpoints but with no
>>> improvement in performance. What we have witnessed is that on average the
>>> majority of operators and their respective subtasks acknowledge checkpoints
>>> within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before
>>> acknowledging the checkpoint. Also, the subtask load seems skewed after
>>> applying transformations prior to the sinks (tried to rebalance and shuffle
>>> here but with no improvement). Checkpoint duration can vary between 5s and
>>> 7 minutes.
>>>
>>> We believe this is slowing our overall job throughput as Kafka
>>> transaction commits are delayed by slower checkpointing, creating upstream
>>> backpressure, and a buildup on the source Kafka topic offsets. We would
>>> ideally like to decrease the checkpoint interval once durations are low and
>>> stable.
>>>
>>> Any help on this would be greatly appreciated.
>>>
>>> Best regards
>>> Terry
>>> ᐧ
>>>
>>

Re: Flink Checkpoint Duration/Job Throughput

Posted by Terry Heathcote <te...@gmail.com>.
Hi Caizhi

Thank you for the response. Below is relevant code for the pipeline as
requested, along with the Kafka properties we set for both the
FlinkKafkaProducer and Consumer. The operator that suffers the data skew
are the sinks.

import Models.BlueprintCacheDataType;
import PipelineBlocks.*;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import org.apache.flink.streaming.connectors.kafka.*;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.jetbrains.annotations.Nullable;
import org.json.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import java.time.Duration;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;


public class IngestionStreamer {
private StreamExecutionEnvironment env;
private KafkaConfig config;
private String commanderUrl;
private static final Logger LOG = LoggerFactory.getLogger(IngestionStreamer.
class);

public IngestionStreamer(StreamExecutionEnvironment env, KafkaConfig config,
String commanderUrl) {
this.env = env;
this.config = config;
this.commanderUrl = commanderUrl;
}

public void StartStreamer(String topic) {

FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), config.kafkaPropertiesConsumer);
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

DataStream<BlueprintCacheDataType> result = kafkaStream
.map(new KeySorter()).uid("Mapper")
.process(new FetchBlueprintInfoKeyed(commanderUrl))uid("BP_Fetch");

DataStream<ProducerRecord<byte[], byte[]>> blueprints = result.process(new
MapToBlueprints()).uid("Blueprint map");
DataStream<ProducerRecord<byte[], byte[]>> swamp = result.process(new
ToSwamp()).uid("Swamp map");

blueprints.addSink(new FlinkKafkaProducer<>(
"default-blueprint",
new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[], byte[]>
producerRecord, @Nullable Long aLong) {
return new ProducerRecord<>(producerRecord.topic(), producerRecord.value());
}
},
config.kafkaPropertiesProducer,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)).name("Blueprint sink").uid("BP_Sink");

swamp.addSink(new FlinkKafkaProducer<>(
"default-swamp",
new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[], byte[]>
producerRecord, @Nullable Long aLong) {
return new ProducerRecord<>(producerRecord.topic(), producerRecord.value());
}
},
config.kafkaPropertiesProducer,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)).name("Swamp sink").uid("Swamp_Sink");


}

}


import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaConfig {

public Properties kafkaPropertiesConsumer;
public Properties kafkaPropertiesProducer;

public KafkaConfig(Main.Environment env) {
kafkaPropertiesConsumer = addKafkaConsumerProperties(KafkaProperties(env));
kafkaPropertiesProducer = addKafkaProducerProperties(KafkaProperties(env));
}

public Properties KafkaProperties(Main.Environment env) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"kafka-0-external:9094,kafka-1-external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4-external:9094"
);
properties.setProperty("group.id", System.getenv("KAFKA_GROUP_ID"));
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("session.timeout.ms", "300000");
properties.setProperty("default.api.timeout.ms", "300000");
properties.setProperty("request.timeout.ms", "300000");
properties.setProperty("flink.partition-discovery.interval-millis", "50000"
);
return properties;
}

public Properties addKafkaConsumerProperties(Properties properties) {
properties.setProperty("key.deserializer", StringDeserializer.class.getName
());
properties.setProperty("value.deserializer", StringDeserializer.class.
getName());
properties.setProperty("max.poll.interval.ms", "300000");
return properties;
}

public Properties addKafkaProducerProperties(Properties properties) {
properties.setProperty("transaction.timeout.ms", "900000");
properties.setProperty("max.block.ms", "600000");
properties.setProperty("delivery.timeout.ms", "300000");
return properties;
}
}


Best regards
Terry
ᐧ

On Wed, Dec 22, 2021 at 3:50 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> From your description this is due to data skew. The common solution to
> data skew is to add a random value to your partition keys so that data can
> be distributed evenly into downstream operators.
>
> Could you provide more information about your job (preferably user code or
> SQL code), especially the operator that suffers from data skew?
>
> Terry Heathcote <te...@gmail.com> 于2021年12月22日周三 02:53写道:
>
>> Hi
>>
>> We are having trouble with record throughput that we believe to be a
>> result of slow checkpoint durations. The job uses Kafka as both a source
>> and sink as well as a Redis-backed service within the same cluster, used to
>> enrich the data in a transformation, before writing records back to Kafka.
>> Below is a description of the job:
>>
>>    - Flink Version 12.5
>>    - Source topic = 24 partitions.
>>    - Multiple sink topics.
>>    - Parallelism set to 24.
>>    - Operators applied are a map function and process function to fetch
>>    the Redis data.
>>    - EXACTLY_ONCE processing is required.
>>
>> We have switched between aligned and unaligned checkpoints but with no
>> improvement in performance. What we have witnessed is that on average the
>> majority of operators and their respective subtasks acknowledge checkpoints
>> within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before
>> acknowledging the checkpoint. Also, the subtask load seems skewed after
>> applying transformations prior to the sinks (tried to rebalance and shuffle
>> here but with no improvement). Checkpoint duration can vary between 5s and
>> 7 minutes.
>>
>> We believe this is slowing our overall job throughput as Kafka
>> transaction commits are delayed by slower checkpointing, creating upstream
>> backpressure, and a buildup on the source Kafka topic offsets. We would
>> ideally like to decrease the checkpoint interval once durations are low and
>> stable.
>>
>> Any help on this would be greatly appreciated.
>>
>> Best regards
>> Terry
>> ᐧ
>>
>

Re: Flink Checkpoint Duration/Job Throughput

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

From your description this is due to data skew. The common solution to data
skew is to add a random value to your partition keys so that data can be
distributed evenly into downstream operators.

Could you provide more information about your job (preferably user code or
SQL code), especially the operator that suffers from data skew?

Terry Heathcote <te...@gmail.com> 于2021年12月22日周三 02:53写道:

> Hi
>
> We are having trouble with record throughput that we believe to be a
> result of slow checkpoint durations. The job uses Kafka as both a source
> and sink as well as a Redis-backed service within the same cluster, used to
> enrich the data in a transformation, before writing records back to Kafka.
> Below is a description of the job:
>
>    - Flink Version 12.5
>    - Source topic = 24 partitions.
>    - Multiple sink topics.
>    - Parallelism set to 24.
>    - Operators applied are a map function and process function to fetch
>    the Redis data.
>    - EXACTLY_ONCE processing is required.
>
> We have switched between aligned and unaligned checkpoints but with no
> improvement in performance. What we have witnessed is that on average the
> majority of operators and their respective subtasks acknowledge checkpoints
> within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before
> acknowledging the checkpoint. Also, the subtask load seems skewed after
> applying transformations prior to the sinks (tried to rebalance and shuffle
> here but with no improvement). Checkpoint duration can vary between 5s and
> 7 minutes.
>
> We believe this is slowing our overall job throughput as Kafka transaction
> commits are delayed by slower checkpointing, creating upstream
> backpressure, and a buildup on the source Kafka topic offsets. We would
> ideally like to decrease the checkpoint interval once durations are low and
> stable.
>
> Any help on this would be greatly appreciated.
>
> Best regards
> Terry
> ᐧ
>