You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chesnay Schepler <ch...@apache.org> on 2021/02/01 10:37:02 UTC

Re: question on checkpointing

1) An operator that just blocks for a long time (for example, because it 
does a synchronous call to some external service) can indeed cause a 
checkpoint timeout.

2) What kind of effects are you worried about?

On 1/28/2021 8:05 PM, Marco Villalobos wrote:
> Is it possible that checkpointing times out due to an operator taking 
> too long?
>
> Also, does windowing affect the checkpoint barriers?



Re: question on checkpointing

Posted by Arvid Heise <ar...@apache.org>.
Hi Marco,

Actually, perhaps I misworded it.  This particular checkpoint seems to
> occur in an operator that is flat mapping (it is actually a keyed
> processing function) a single blob data-structure into several hundred
> thousands elements (sometimes a million) that immediately flow into a sink.
> I am speculating that the sink writes to the database were taking too long
> and causing a checkpoint to fail, but I changed that sink into a print, and
> the checkpoint still failed, so it must be something else.
>
> you actually hit one of the design flaws of checkpoints. The barrier can
only be interleaved between records. So in your case it would be something
like <input1>, <barrier>, <input2>.  Then the barrier has to wait until
input1 has been fully processed, which may take very long in your case.

I don't know deep details regarding Flinks internals, but I am speculating
> that the data between this operator and sink has to be checkpointed before
> the sink actually does something.
>
No data is being added to the checkpoint (unless you use unaligned
checkpoints, but note that even unaligned checkpoints don't help here).

The only easy solution is to increase checkpointing timeout and live with
long checkpointing times.

A bit more elaborate solution would be to split the flatMap in 2 parts (if
this is possible), such that instead of amplifying the record by a factor
of 1M, you have two amplification steps of 1k. Then you can use unaligned
checkpoints to have quicker checkpoints as each flatMap can be checkpointed
when these 1k records have been generated. (please wait for the upcoming
1.12.2). An example would be the following: suppose you have a record with
a list with 1M entries. Instead of flatten it into 1M records directly, you
first create chunks with a size of 1k and then in a second step flatten
these chunks. So the pipeline is source -> flatMap to chunk -> forward
channel -> flatMap -> sink.

Footnote: We often thought about allowing the user to kind of suspend in
the flatMap to allow things like barrier processing to happen but it's
unfortunately not so easy as snapshotting the operator state in this case
is prone to fail because the user has to make sure to not hold any
state-relevant data while suspending - that's hard to explain.



On Fri, Feb 5, 2021 at 2:31 PM David Anderson <da...@apache.org> wrote:

> I've seen checkpoints timeout when using the RocksDB state backend with
> very large objects. The issue is that updating a ValueState<T> stored in
> RocksDB requires deserializing, updating, and then re-serializing that
> object -- and if that's some enormous collection type, that will be slow.
> In such cases it's much better to use ListState or MapState, if possible,
> or the filesystem state backend -- but the filesystem state backend will
> have to copy those objects during checkpointing, and will need plenty of
> memory.
>
> Checkpoint barriers are not held up by windows. When the barrier reaches
> the head of the input queue, a snapshot is taken of the window's current
> state, and the barrier is forwarded downstream.
>
> On Fri, Feb 5, 2021 at 12:17 PM Robert Metzger <rm...@apache.org>
> wrote:
>
>> By default, a checkpoint times out after 10 minutes. This means if not
>> all operators are able to confirm the checkpoint, it will be cancelled.
>>
>> If you have an operator that is blocking for more than 10 minutes on a
>> single record (because this record contains millions of elements that are
>> written to an external system), then yes, this operator can cause your
>> checkpoints to time out.
>>
>> On Mon, Feb 1, 2021 at 5:26 PM Marco Villalobos <
>> mvillalobos@kineteque.com> wrote:
>>
>>> Actually, perhaps I misworded it.  This particular checkpoint seems to
>>> occur in an operator that is flat mapping (it is actually a keyed
>>> processing function) a single blob data-structure into several hundred
>>> thousands elements (sometimes a million) that immediately flow into a sink.
>>> I am speculating that the sink writes to the database were taking too long
>>> and causing a checkpoint to fail, but I changed that sink into a print, and
>>> the checkpoint still failed, so it must be something else.
>>>
>>> I don't know deep details regarding Flinks internals, but I am
>>> speculating that the data between this operator and sink has to be
>>> checkpointed before the sink actually does something.
>>>
>>> On Mon, Feb 1, 2021 at 2:37 AM Chesnay Schepler <ch...@apache.org>
>>> wrote:
>>>
>>>> 1) An operator that just blocks for a long time (for example, because
>>>> it
>>>> does a synchronous call to some external service) can indeed cause a
>>>> checkpoint timeout.
>>>>
>>>> 2) What kind of effects are you worried about?
>>>>
>>>> On 1/28/2021 8:05 PM, Marco Villalobos wrote:
>>>> > Is it possible that checkpointing times out due to an operator taking
>>>> > too long?
>>>> >
>>>> > Also, does windowing affect the checkpoint barriers?
>>>>
>>>>
>>>>

Re: question on checkpointing

Posted by David Anderson <da...@apache.org>.
I've seen checkpoints timeout when using the RocksDB state backend with
very large objects. The issue is that updating a ValueState<T> stored in
RocksDB requires deserializing, updating, and then re-serializing that
object -- and if that's some enormous collection type, that will be slow.
In such cases it's much better to use ListState or MapState, if possible,
or the filesystem state backend -- but the filesystem state backend will
have to copy those objects during checkpointing, and will need plenty of
memory.

Checkpoint barriers are not held up by windows. When the barrier reaches
the head of the input queue, a snapshot is taken of the window's current
state, and the barrier is forwarded downstream.

On Fri, Feb 5, 2021 at 12:17 PM Robert Metzger <rm...@apache.org> wrote:

> By default, a checkpoint times out after 10 minutes. This means if not all
> operators are able to confirm the checkpoint, it will be cancelled.
>
> If you have an operator that is blocking for more than 10 minutes on a
> single record (because this record contains millions of elements that are
> written to an external system), then yes, this operator can cause your
> checkpoints to time out.
>
> On Mon, Feb 1, 2021 at 5:26 PM Marco Villalobos <mv...@kineteque.com>
> wrote:
>
>> Actually, perhaps I misworded it.  This particular checkpoint seems to
>> occur in an operator that is flat mapping (it is actually a keyed
>> processing function) a single blob data-structure into several hundred
>> thousands elements (sometimes a million) that immediately flow into a sink.
>> I am speculating that the sink writes to the database were taking too long
>> and causing a checkpoint to fail, but I changed that sink into a print, and
>> the checkpoint still failed, so it must be something else.
>>
>> I don't know deep details regarding Flinks internals, but I am
>> speculating that the data between this operator and sink has to be
>> checkpointed before the sink actually does something.
>>
>> On Mon, Feb 1, 2021 at 2:37 AM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> 1) An operator that just blocks for a long time (for example, because it
>>> does a synchronous call to some external service) can indeed cause a
>>> checkpoint timeout.
>>>
>>> 2) What kind of effects are you worried about?
>>>
>>> On 1/28/2021 8:05 PM, Marco Villalobos wrote:
>>> > Is it possible that checkpointing times out due to an operator taking
>>> > too long?
>>> >
>>> > Also, does windowing affect the checkpoint barriers?
>>>
>>>
>>>

Re: question on checkpointing

Posted by Robert Metzger <rm...@apache.org>.
By default, a checkpoint times out after 10 minutes. This means if not all
operators are able to confirm the checkpoint, it will be cancelled.

If you have an operator that is blocking for more than 10 minutes on a
single record (because this record contains millions of elements that are
written to an external system), then yes, this operator can cause your
checkpoints to time out.

On Mon, Feb 1, 2021 at 5:26 PM Marco Villalobos <mv...@kineteque.com>
wrote:

> Actually, perhaps I misworded it.  This particular checkpoint seems to
> occur in an operator that is flat mapping (it is actually a keyed
> processing function) a single blob data-structure into several hundred
> thousands elements (sometimes a million) that immediately flow into a sink.
> I am speculating that the sink writes to the database were taking too long
> and causing a checkpoint to fail, but I changed that sink into a print, and
> the checkpoint still failed, so it must be something else.
>
> I don't know deep details regarding Flinks internals, but I am speculating
> that the data between this operator and sink has to be checkpointed before
> the sink actually does something.
>
> On Mon, Feb 1, 2021 at 2:37 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> 1) An operator that just blocks for a long time (for example, because it
>> does a synchronous call to some external service) can indeed cause a
>> checkpoint timeout.
>>
>> 2) What kind of effects are you worried about?
>>
>> On 1/28/2021 8:05 PM, Marco Villalobos wrote:
>> > Is it possible that checkpointing times out due to an operator taking
>> > too long?
>> >
>> > Also, does windowing affect the checkpoint barriers?
>>
>>
>>

Re: question on checkpointing

Posted by Marco Villalobos <mv...@kineteque.com>.
Actually, perhaps I misworded it.  This particular checkpoint seems to
occur in an operator that is flat mapping (it is actually a keyed
processing function) a single blob data-structure into several hundred
thousands elements (sometimes a million) that immediately flow into a sink.
I am speculating that the sink writes to the database were taking too long
and causing a checkpoint to fail, but I changed that sink into a print, and
the checkpoint still failed, so it must be something else.

I don't know deep details regarding Flinks internals, but I am speculating
that the data between this operator and sink has to be checkpointed before
the sink actually does something.

On Mon, Feb 1, 2021 at 2:37 AM Chesnay Schepler <ch...@apache.org> wrote:

> 1) An operator that just blocks for a long time (for example, because it
> does a synchronous call to some external service) can indeed cause a
> checkpoint timeout.
>
> 2) What kind of effects are you worried about?
>
> On 1/28/2021 8:05 PM, Marco Villalobos wrote:
> > Is it possible that checkpointing times out due to an operator taking
> > too long?
> >
> > Also, does windowing affect the checkpoint barriers?
>
>
>