You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Surana <vi...@moengage.com> on 2022/11/15 08:07:22 UTC

Kafka transactioins & flink checkpoints

I wanted to achieve exactly once semantics in my job and wanted to make sure I understood the current behaviour correctly:

1. Only one Kafka transaction at a time (no concurrent checkpoints)
2. Only one transaction per checkpoint


My job has very large amount of state (>100GB) and I have no option but to use unaligned checkpoints. With the above limitation, it seems to me that if checkpoint interval is 1 minute and checkpoint takes about 10 seconds to complete then only one Kafka transaction can happen in 70 seconds. All of the output records will not be visible until the transaction completes. This way a steady stream of inputs will result in an buffered output stream where data is only visible after a minute, thereby destroying any sort of real time streaming use cases. Reducing the checkpoint interval is not really an option given the size of the checkpoint. Only way out would be to allow for multiple transactions per checkpoint.

Thanks,
Vishal

Re: Kafka transactioins & flink checkpoints

Posted by Yaroslav Tkachenko <ya...@goldsky.com>.
I gave a talk about that setup:
https://www.youtube.com/watch?v=tiGxEGPyqCg&ab_channel=FlinkForward

The documentation suggests using unaligned checkpoints in case of
backpressure (
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints).
I'd like to add that typically it means that you end up with a data skew,
which causes long time to finalize the checkpoint, since different
subtasks take different time to finalize. If you don't have data skew (your
state is uniformly distributed) and you don't have a lot of backpressure
you shouldn't enable unaligned checkpoints IMO. So my suggestion is to
analyze your workload and try to eliminate any data skews you find.

Btw, what storage do you use for your RocksDB state? I suggest using the
fastest SSD you can get, e.g. in case of cloud vendors don't use something
like AWS EBS and use the ephemeral instance storage instead.



On Wed, Nov 16, 2022 at 4:49 AM Vishal Surana <vi...@moengage.com> wrote:

> Yes. I do use RocksDB for (incremental) checkpointing. During each
> checkpoint 15-20GB of state gets created (new state added, some expired). I
> make use of FIFO compaction.
>
> I’m a bit surprised you were able to run with 10+TB state without
> unaligned checkpoints because the performance in my application degrades
> quite a lot. Can you share your checkpoint configurations?
>
>
> Thanks,
> Vishal
> On 15 Nov 2022, 10:07 PM +0530, Yaroslav Tkachenko <ya...@goldsky.com>,
> wrote:
>
> Hi Vishal,
>
> Just wanted to comment on this bit:
>
> > My job has very large amount of state (>100GB) and I have no option but
> to use unaligned checkpoints.
>
> I successfully ran Flink jobs with 10+ TB of state and no unaligned
> checkpoints enabled. Usually, you consider enabling them when there is some
> kind of skew in the topology, but IMO it's unrelated to the state size.
>
> > Reducing the checkpoint interval is not really an option given the size
> of the checkpoint
>
> Do you use RocksDB state backend with incremental checkpointing?
>
> On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana <vi...@moengage.com>
> wrote:
>
>> I wanted to achieve exactly once semantics in my job and wanted to make
>> sure I understood the current behaviour correctly:
>>
>>    1. Only one Kafka transaction at a time (no concurrent checkpoints)
>>    2. Only one transaction per checkpoint
>>
>>
>> My job has very large amount of state (>100GB) and I have no option but
>> to use unaligned checkpoints. With the above limitation, it seems to me
>> that if checkpoint interval is 1 minute and checkpoint takes about 10
>> seconds to complete then only one Kafka transaction can happen in 70
>> seconds. All of the output records will not be visible until the
>> transaction completes. This way a steady stream of inputs will result in an
>> buffered output stream where data is only visible after a minute, thereby
>> destroying any sort of real time streaming use cases. Reducing the
>> checkpoint interval is not really an option given the size of the
>> checkpoint. Only way out would be to allow for multiple transactions per
>> checkpoint.
>>
>> Thanks,
>> Vishal
>>
>

Re: Kafka transactioins & flink checkpoints

Posted by Vishal Surana <vi...@moengage.com>.
Yes. I do use RocksDB for (incremental) checkpointing. During each checkpoint 15-20GB of state gets created (new state added, some expired). I make use of FIFO compaction.

I’m a bit surprised you were able to run with 10+TB state without unaligned checkpoints because the performance in my application degrades quite a lot. Can you share your checkpoint configurations?


Thanks,
Vishal
On 15 Nov 2022, 10:07 PM +0530, Yaroslav Tkachenko <ya...@goldsky.com>, wrote:
> Hi Vishal,
>
> Just wanted to comment on this bit:
>
> > My job has very large amount of state (>100GB) and I have no option but to use unaligned checkpoints.
>
> I successfully ran Flink jobs with 10+ TB of state and no unaligned checkpoints enabled. Usually, you consider enabling them when there is some kind of skew in the topology, but IMO it's unrelated to the state size.
>
> > Reducing the checkpoint interval is not really an option given the size of the checkpoint
>
> Do you use RocksDB state backend with incremental checkpointing?
>
> > On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana <vi...@moengage.com> wrote:
> > > I wanted to achieve exactly once semantics in my job and wanted to make sure I understood the current behaviour correctly:
> > >
> > > 1. Only one Kafka transaction at a time (no concurrent checkpoints)
> > > 2. Only one transaction per checkpoint
> > >
> > >
> > > My job has very large amount of state (>100GB) and I have no option but to use unaligned checkpoints. With the above limitation, it seems to me that if checkpoint interval is 1 minute and checkpoint takes about 10 seconds to complete then only one Kafka transaction can happen in 70 seconds. All of the output records will not be visible until the transaction completes. This way a steady stream of inputs will result in an buffered output stream where data is only visible after a minute, thereby destroying any sort of real time streaming use cases. Reducing the checkpoint interval is not really an option given the size of the checkpoint. Only way out would be to allow for multiple transactions per checkpoint.
> > >
> > > Thanks,
> > > Vishal

Re: Kafka transactioins & flink checkpoints

Posted by Yaroslav Tkachenko <ya...@goldsky.com>.
Hi Vishal,

Just wanted to comment on this bit:

> My job has very large amount of state (>100GB) and I have no option but
to use unaligned checkpoints.

I successfully ran Flink jobs with 10+ TB of state and no unaligned
checkpoints enabled. Usually, you consider enabling them when there is some
kind of skew in the topology, but IMO it's unrelated to the state size.

> Reducing the checkpoint interval is not really an option given the size
of the checkpoint

Do you use RocksDB state backend with incremental checkpointing?

On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana <vi...@moengage.com> wrote:

> I wanted to achieve exactly once semantics in my job and wanted to make
> sure I understood the current behaviour correctly:
>
>    1. Only one Kafka transaction at a time (no concurrent checkpoints)
>    2. Only one transaction per checkpoint
>
>
> My job has very large amount of state (>100GB) and I have no option but to
> use unaligned checkpoints. With the above limitation, it seems to me that
> if checkpoint interval is 1 minute and checkpoint takes about 10 seconds to
> complete then only one Kafka transaction can happen in 70 seconds. All of
> the output records will not be visible until the transaction completes.
> This way a steady stream of inputs will result in an buffered output stream
> where data is only visible after a minute, thereby destroying any sort of
> real time streaming use cases. Reducing the checkpoint interval is not
> really an option given the size of the checkpoint. Only way out would be to
> allow for multiple transactions per checkpoint.
>
> Thanks,
> Vishal
>