You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Filip Karnicki <fi...@gmail.com> on 2022/03/26 13:10:47 UTC

Exactly-once sink sync checkpoint stacking time effect

Hi, I noticed that with each added (kafka) sink with exactly-once
guarantees, there looks to be a penalty of ~100ms in terms of sync
checkpointing time.

Would anyone be able to explain and/or point me in the right direction in
the source code so that I could understand why that is? Specifically, why
there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
all sinks, potentially pointing to a sequential set of IO calls (wiiiild
guess)

I would be keen to understand if there's anything I could do (incl.
contributing code) that would parallelise this penalty in terms of sync
checkpointing time.

Alternatively, is there any setting that would help me bring the sync
checkpointing time down (and still get exactly-once guarantees)?

Many thanks,
Fil

Re: Exactly-once sink sync checkpoint stacking time effect

Posted by Filip Karnicki <fi...@gmail.com>.
Thank you very much for your answer.

I was able to reduce the number of sinks as you described. That helped a
lot, thank you.

I think you must be right with regards to (2) - opening a new transaction
being the culprit. It's unlikely to be (1) since this behaviour occurs even
when there are 0 messages going through a brand new, locally running kafka
cluster.

Kind regards,
Fil

On Tue, 29 Mar 2022 at 09:34, Arvid Heise <ar...@apache.org> wrote:

> Hi Filip,
>
> two things will impact sync time for Kafka:
> 1. Flushing all old data [1], in particular flushing all in-flight
> partitions [2]. However, that shouldn't cause a stacking effect except when
> the brokers are overloaded on checkpoint.
> 2. Opening a new transaction [3]. Since all transactions are linearized on
> the Kafka brokers, this is the most likely root cause. Note that aborted
> checkpoints may require multiple transactions to be opened. So you could
> check if you have them quite often aborted.
>
> If you want to know more, I suggest you attach a profiler and find the
> specific culprit and report back [4]. There is a low probability that the
> sink framework has a bug that causes this behavior. In that case, we can
> fix it more easily than if it's a fundamental issue with Kafka. In general,
> exactly-once and low latency are somewhat contradicting requirements, so
> there is only so much you can do.
>
> Not knowing your topology but maybe you can reduce the number of sinks?
> With the KafkaRecordSerializationSchema you can set different topics for
> different ProducerRecords of the same DataStream.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/
>
>
> On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki <fi...@gmail.com>
> wrote:
>
>> Hi, I noticed that with each added (kafka) sink with exactly-once
>> guarantees, there looks to be a penalty of ~100ms in terms of sync
>> checkpointing time.
>>
>> Would anyone be able to explain and/or point me in the right direction in
>> the source code so that I could understand why that is? Specifically, why
>> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
>> all sinks, potentially pointing to a sequential set of IO calls (wiiiild
>> guess)
>>
>> I would be keen to understand if there's anything I could do (incl.
>> contributing code) that would parallelise this penalty in terms of sync
>> checkpointing time.
>>
>> Alternatively, is there any setting that would help me bring the sync
>> checkpointing time down (and still get exactly-once guarantees)?
>>
>> Many thanks,
>> Fil
>>
>

Re: Exactly-once sink sync checkpoint stacking time effect

Posted by Arvid Heise <ar...@apache.org>.
Hi Filip,

two things will impact sync time for Kafka:
1. Flushing all old data [1], in particular flushing all in-flight
partitions [2]. However, that shouldn't cause a stacking effect except when
the brokers are overloaded on checkpoint.
2. Opening a new transaction [3]. Since all transactions are linearized on
the Kafka brokers, this is the most likely root cause. Note that aborted
checkpoints may require multiple transactions to be opened. So you could
check if you have them quite often aborted.

If you want to know more, I suggest you attach a profiler and find the
specific culprit and report back [4]. There is a low probability that the
sink framework has a bug that causes this behavior. In that case, we can
fix it more easily than if it's a fundamental issue with Kafka. In general,
exactly-once and low latency are somewhat contradicting requirements, so
there is only so much you can do.

Not knowing your topology but maybe you can reduce the number of sinks?
With the KafkaRecordSerializationSchema you can set different topics for
different ProducerRecords of the same DataStream.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
[3]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/


On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki <fi...@gmail.com>
wrote:

> Hi, I noticed that with each added (kafka) sink with exactly-once
> guarantees, there looks to be a penalty of ~100ms in terms of sync
> checkpointing time.
>
> Would anyone be able to explain and/or point me in the right direction in
> the source code so that I could understand why that is? Specifically, why
> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
> all sinks, potentially pointing to a sequential set of IO calls (wiiiild
> guess)
>
> I would be keen to understand if there's anything I could do (incl.
> contributing code) that would parallelise this penalty in terms of sync
> checkpointing time.
>
> Alternatively, is there any setting that would help me bring the sync
> checkpointing time down (and still get exactly-once guarantees)?
>
> Many thanks,
> Fil
>