You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2021/02/11 15:27:38 UTC

Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

Thanks, Roman for publishing this design.

There seems to be quite a bit of overlap with FLIP-158 (generalized
incremental checkpoints).

I would go with +1 to the effort if it is a pretty self-contained and
closed effort. Meaning we don't expect that this needs a ton of follow-ups,
other than common maintenance and small bug fixes. If we expect that this
requires a lot of follow-ups, then we end up splitting our work between
this FLIP and FLIP-158, which seems a bit inefficient.
What other committers would be involved to ensure the community can
maintain this?


The design looks fine, in general, with one question:

When persisting changes, you persist all changes that have a newer version
than the latest one confirmed by the JM.

Can you explain why it is like that exactly? Alternatively, you could keep
the latest checkpoint ID for which the state backend persisted the diff
successfully to the checkpoint storage, and created a state handle. For
each checkpoint, the state backend includes the state handles of all
involved chunks. That would be similar to the log-based approach in
FLIP-158.

I have a suspicion that this is because the JM may have released the state
handle (and discarded the diff) for a checkpoint that succeeded on the task
but didn't succeed globally. So we cannot reference any state handle that
has been handed over to the JobManager, but is not yet confirmed.

This characteristic seems to be at the heart of much of the complexity,
also the handling of removed keys seems to be caused by that.
If we could change that assumption, the design would become simpler.

(Side note: I am wondering if this also impacts the FLIP-158 DSTL design.)

Best,
Stephan


On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Hi Stefan,
>
> Thanks for your reply. Very interesting ideas!
> If I understand correctly, SharedStateRegistry will still be responsible
> for pruning the old state; for that, it will maintain some (ordered)
> mapping between StateMaps and their versions, per key group.
> I think one modification to this approach is needed to support journaling:
> for each entry, maintain a version when it was last fully snapshotted; and
> use this version to find the minimum as you described above.
> I'm considering a better state cleanup and optimization of removals as the
> next step. Anyway, I will add it to the FLIP document.
>
> Thanks!
>
> Regards,
> Roman
>
>
> On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter <stefanrichter83@gmail.com
> >
> wrote:
>
> > Hi,
> >
> > Very happy to see that the incremental checkpoint idea is finally
> becoming
> > a reality for the heap backend! Overall the proposal looks pretty good to
> > me. Just wanted to point out one possible improvement from what I can
> still
> > remember from my ideas back then: I think you can avoid doing periodic
> full
> > snapshots for consolidation. Instead, my suggestion would be to track the
> > version numbers you encounter while you iterate a snapshot for writing
> it -
> > and then you should be able to prune all incremental snapshots that were
> > performed with a version number smaller than the minimum you find. To
> avoid
> > the problem of very old entries that never get modified you could start
> > spilling entries with a certain age-difference compared to the current
> map
> > version so that eventually all entries for an old version are re-written
> to
> > newer snapshots. You can track the version up to which this was done in
> the
> > map and then you can again let go of their corresponding snapshots after
> a
> > guaranteed time.So instead of having the burden of periodic large
> > snapshots, you can make every snapshot work a little bit on the cleanup
> and
> > if you are lucky it might happen mostly by itself if most entries are
> > frequently updated. I would also consider to make map clean a special
> event
> > in your log and consider unticking the versions on this event - this
> allows
> > you to let go of old snapshots and saves you from writing a log of
> > antimatter entries. Maybe the ideas are still useful to you.
> >
> > Best,
> > Stefan
> >
> > On 2020/11/04 01:54:25, Khachatryan Roman <k....@gmail.com> wrote:
> > > Hi devs,>
> > >
> > > I'd like to start a discussion of FLIP-151: Incremental snapshots for>
> > > heap-based state backend [1]>
> > >
> > > Heap backend, while being limited state sizes fitting into memory, also
> > has>
> > > some advantages compared to RocksDB backend:>
> > > 1. Serialization once per checkpoint, not per state modification. This>
> > > allows to “squash” updates to the same keys>
> > > 2. Shorter synchronous phase (compared to RocksDB incremental)>
> > > 3. No need for sorting and compaction, no IO amplification and JNI
> > overhead>
> > > This can potentially give higher throughput and efficiency.>
> > >
> > > However, Heap backend currently lacks incremental checkpoints. This
> > FLIP>
> > > aims to add initial support for them.>
> > >
> > > [1]>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend
> >
> >
> > >
> > >
> > > Any feedback highly appreciated.>
> > >
> > > Regards,>
> > > Roman>
> > >
>

Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

Posted by Roman Khachatryan <ro...@apache.org>.
That's an interesting idea.

I guess we can decouple the actual state cleanup delegation from the
correctness issues. I don't see any reason why it can't be implemented
without changing notifications (for FLIP-158, however, we'll probably have
to ask "random" TMs because FLIP-158 adds state sharing also across
operators. Thus finding the right TM for state removal can become
difficult).

I'm not sure about the correctness issue though (FLINK-21351). What happens
if, after upscaling, JM asks one TM to discard the state; but the second TM
receives the notification with a delay? IIUC, it can refer to a discarded
state.
I think we can prevent this by sending the earliest retained checkpoint ID
in trigger RPC/barriers (instead of notifications).

However, with multiple concurrent checkpoints, it still seems not enough,
because some other checkpoint can be completed and cause in-use checkpoint
to be subsumed. Delegation to TM doesn't solve the problem because of
rescaling (and state sharing across operators).
I think we can solve it either by limiting max-concurrent-checkpoint to 1;
or by "locking" all the checkpoints that can still be in use (i.e.
num-retained-checkpoints from the earliest pending checkpoint). For
example, with num-retained-checkpoints=3:
| completed        | pending  |
| cp0 | cp1 | cp2 | cp3 | cp4 |
If cp3 and cp4 both specify cp0 as the earliest retained checkpoint; then
cp0 can not be subsumed until cp4 is completed - even if cp3 is.

Aborted checkpoints are different in that they are removed from the "tail".
TM can't infer from the earliest checkpoint ID whether some later changes
could be removed. The solution would be to also add
last-completed-checkpoint to notifications/trigger-RPC/barriers.

To limit FLIP-158 scope I'd implement the last change and
limit max-concurrent-checkpoint to 1.

Regards,
Roman


On Tue, Feb 16, 2021 at 10:00 AM Stephan Ewen <se...@apache.org> wrote:

> Thanks for clarifying.
>
> Concerning the JM aborted checkpoints and state handles: I was thinking
> about it the other day as well and was considering an approach like that:
>
> The core idea is to move the cleanup from JM to TM. That solves two issues:
>
> (1) The StateBackends / DSTL delete the artifacts themselves, meaning we
> don't have to make assumptions about the state on the JM. That sounds too
> fragile, with easy bugs as soon as some slight assumptions change (see also
> bug with incr. checkpoint / savepoint data loss,
> https://issues.apache.org/jira/browse/FLINK-21351)
>
> (2) We do not need to clean up from one node. In the past, doing the
> cleanup from one node (JM) has sometimes become a bottleneck.
>
> To achieve that, we would need to extend the "notifyCheckpointComplete()"
> RPC from the JM to the TM includes both the ID of the completed checkpoint,
> and the ID of the earliest retained checkpoint. Then the TM can clean up
> all artifacts from earlier checkpoints.
>
> There are two open questions to that design:
> (1) On restore, we need to communicate the state handles of the previous
> checkpoints to the TM as well, so the TM gets again the full picture of all
> state artifacts.
> (2) On rescaling, we need to clarify which TM is responsible for releasing
> a handle, if they are mapped to multiple TMs. Otherwise we get
> double-delete calls. That isn't per se a problem, it is just a bit less
> efficient.
>
>
> Maybe we could think in that direction for the DSTL work?
>
>
>
> On Mon, Feb 15, 2021 at 8:44 PM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> Thanks for your reply Stephan.
>>
>> Yes, there is overlap between FLIP-151 and FLIP-158 as both
>> address incremental state updates. However, I think that FLIP-151 on top
>> of FLIP-158 increases efficiency by:
>>
>> 1. "Squashing" the changes made to the same key. For example, if some
>> counter was changed 10 times then FLIP-151 will send only the last value
>> (this allows to send AND store less data compared to FLIP-158)
>>
>> 2. Keeping in memory only the changed keys and not the values.
>> (this allows to reduce memory AND latency (caused by serialization +
>> copying on every update) compared to FLIP-158)
>>
>> (1) can probably be implemented in FLIP-158, but not (2).
>>
>> I don't think there will be a lot of follow-up efforts and I hope
>> @Dawid Wysakowicz <dw...@apache.org>, @pnowojski
>> <pn...@apache.org> , Yuan Mei and probably
>> @Yu Li <ca...@gmail.com>  will be able to join at different stages.
>>
>> Regarding using only the confirmed checkpoints, you are right: JM can
>> abort non-confirmed checkpoints and discard the state. FLIP-158 has
>> the same problem because StateChangelog produces StateHandles that
>>  can be discarded by the JM. Currently, potentially discarded
>> changes are re-uploaded in both FLIPs.
>>
>> In FLIP-158 (or follow-up), I planned to improve this part by:
>> 1. Limiting max-concurrent-checkpoints to 1, and
>> 2. Sending the last confirmed checkpoint ID in RPCs and barriers
>> So at the time of checkpoint, backend knows exactly which changes can be
>> included.
>>
>> Handling of removed keys is not related to the aborted checkpoints. They
>> are
>> needed on recovery to actually remove data from the previous snapshot.
>> In FLIP-158 it is again similar: ChangelogStateBackend has to encode
>> removal operations and send them to StateChangelog (though no additional
>> data structure is required).
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen <se...@apache.org> wrote:
>>
>> > Thanks, Roman for publishing this design.
>> >
>> > There seems to be quite a bit of overlap with FLIP-158 (generalized
>> > incremental checkpoints).
>> >
>> > I would go with +1 to the effort if it is a pretty self-contained and
>> > closed effort. Meaning we don't expect that this needs a ton of
>> follow-ups,
>> > other than common maintenance and small bug fixes. If we expect that
>> this
>> > requires a lot of follow-ups, then we end up splitting our work between
>> > this FLIP and FLIP-158, which seems a bit inefficient.
>> > What other committers would be involved to ensure the community can
>> > maintain this?
>> >
>> >
>> > The design looks fine, in general, with one question:
>> >
>> > When persisting changes, you persist all changes that have a newer
>> version
>> > than the latest one confirmed by the JM.
>> >
>> > Can you explain why it is like that exactly? Alternatively, you could
>> keep
>> > the latest checkpoint ID for which the state backend persisted the diff
>> > successfully to the checkpoint storage, and created a state handle. For
>> > each checkpoint, the state backend includes the state handles of all
>> > involved chunks. That would be similar to the log-based approach in
>> > FLIP-158.
>> >
>> > I have a suspicion that this is because the JM may have released the
>> state
>> > handle (and discarded the diff) for a checkpoint that succeeded on the
>> task
>> > but didn't succeed globally. So we cannot reference any state handle
>> that
>> > has been handed over to the JobManager, but is not yet confirmed.
>> >
>> > This characteristic seems to be at the heart of much of the complexity,
>> > also the handling of removed keys seems to be caused by that.
>> > If we could change that assumption, the design would become simpler.
>> >
>> > (Side note: I am wondering if this also impacts the FLIP-158 DSTL
>> design.)
>> >
>> > Best,
>> > Stephan
>> >
>> >
>> > On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman <
>> > khachatryan.roman@gmail.com> wrote:
>> >
>> > > Hi Stefan,
>> > >
>> > > Thanks for your reply. Very interesting ideas!
>> > > If I understand correctly, SharedStateRegistry will still be
>> responsible
>> > > for pruning the old state; for that, it will maintain some (ordered)
>> > > mapping between StateMaps and their versions, per key group.
>> > > I think one modification to this approach is needed to support
>> > journaling:
>> > > for each entry, maintain a version when it was last fully snapshotted;
>> > and
>> > > use this version to find the minimum as you described above.
>> > > I'm considering a better state cleanup and optimization of removals as
>> > the
>> > > next step. Anyway, I will add it to the FLIP document.
>> > >
>> > > Thanks!
>> > >
>> > > Regards,
>> > > Roman
>> > >
>> > >
>> > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter <
>> > stefanrichter83@gmail.com
>> > > >
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > Very happy to see that the incremental checkpoint idea is finally
>> > > becoming
>> > > > a reality for the heap backend! Overall the proposal looks pretty
>> good
>> > to
>> > > > me. Just wanted to point out one possible improvement from what I
>> can
>> > > still
>> > > > remember from my ideas back then: I think you can avoid doing
>> periodic
>> > > full
>> > > > snapshots for consolidation. Instead, my suggestion would be to
>> track
>> > the
>> > > > version numbers you encounter while you iterate a snapshot for
>> writing
>> > > it -
>> > > > and then you should be able to prune all incremental snapshots that
>> > were
>> > > > performed with a version number smaller than the minimum you find.
>> To
>> > > avoid
>> > > > the problem of very old entries that never get modified you could
>> start
>> > > > spilling entries with a certain age-difference compared to the
>> current
>> > > map
>> > > > version so that eventually all entries for an old version are
>> > re-written
>> > > to
>> > > > newer snapshots. You can track the version up to which this was
>> done in
>> > > the
>> > > > map and then you can again let go of their corresponding snapshots
>> > after
>> > > a
>> > > > guaranteed time.So instead of having the burden of periodic large
>> > > > snapshots, you can make every snapshot work a little bit on the
>> cleanup
>> > > and
>> > > > if you are lucky it might happen mostly by itself if most entries
>> are
>> > > > frequently updated. I would also consider to make map clean a
>> special
>> > > event
>> > > > in your log and consider unticking the versions on this event - this
>> > > allows
>> > > > you to let go of old snapshots and saves you from writing a log of
>> > > > antimatter entries. Maybe the ideas are still useful to you.
>> > > >
>> > > > Best,
>> > > > Stefan
>> > > >
>> > > > On 2020/11/04 01:54:25, Khachatryan Roman <k....@gmail.com> wrote:
>> > > > > Hi devs,>
>> > > > >
>> > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots
>> > for>
>> > > > > heap-based state backend [1]>
>> > > > >
>> > > > > Heap backend, while being limited state sizes fitting into memory,
>> > also
>> > > > has>
>> > > > > some advantages compared to RocksDB backend:>
>> > > > > 1. Serialization once per checkpoint, not per state modification.
>> > This>
>> > > > > allows to “squash” updates to the same keys>
>> > > > > 2. Shorter synchronous phase (compared to RocksDB incremental)>
>> > > > > 3. No need for sorting and compaction, no IO amplification and JNI
>> > > > overhead>
>> > > > > This can potentially give higher throughput and efficiency.>
>> > > > >
>> > > > > However, Heap backend currently lacks incremental checkpoints.
>> This
>> > > > FLIP>
>> > > > > aims to add initial support for them.>
>> > > > >
>> > > > > [1]>
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend
>> > > >
>> > > >
>> > > > >
>> > > > >
>> > > > > Any feedback highly appreciated.>
>> > > > >
>> > > > > Regards,>
>> > > > > Roman>
>> > > > >
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

Posted by Stephan Ewen <se...@apache.org>.
Thanks for clarifying.

Concerning the JM aborted checkpoints and state handles: I was thinking
about it the other day as well and was considering an approach like that:

The core idea is to move the cleanup from JM to TM. That solves two issues:

(1) The StateBackends / DSTL delete the artifacts themselves, meaning we
don't have to make assumptions about the state on the JM. That sounds too
fragile, with easy bugs as soon as some slight assumptions change (see also
bug with incr. checkpoint / savepoint data loss,
https://issues.apache.org/jira/browse/FLINK-21351)

(2) We do not need to clean up from one node. In the past, doing the
cleanup from one node (JM) has sometimes become a bottleneck.

To achieve that, we would need to extend the "notifyCheckpointComplete()"
RPC from the JM to the TM includes both the ID of the completed checkpoint,
and the ID of the earliest retained checkpoint. Then the TM can clean up
all artifacts from earlier checkpoints.

There are two open questions to that design:
(1) On restore, we need to communicate the state handles of the previous
checkpoints to the TM as well, so the TM gets again the full picture of all
state artifacts.
(2) On rescaling, we need to clarify which TM is responsible for releasing
a handle, if they are mapped to multiple TMs. Otherwise we get
double-delete calls. That isn't per se a problem, it is just a bit less
efficient.


Maybe we could think in that direction for the DSTL work?



On Mon, Feb 15, 2021 at 8:44 PM Roman Khachatryan <ro...@apache.org> wrote:

> Thanks for your reply Stephan.
>
> Yes, there is overlap between FLIP-151 and FLIP-158 as both
> address incremental state updates. However, I think that FLIP-151 on top
> of FLIP-158 increases efficiency by:
>
> 1. "Squashing" the changes made to the same key. For example, if some
> counter was changed 10 times then FLIP-151 will send only the last value
> (this allows to send AND store less data compared to FLIP-158)
>
> 2. Keeping in memory only the changed keys and not the values.
> (this allows to reduce memory AND latency (caused by serialization +
> copying on every update) compared to FLIP-158)
>
> (1) can probably be implemented in FLIP-158, but not (2).
>
> I don't think there will be a lot of follow-up efforts and I hope
> @Dawid Wysakowicz <dw...@apache.org>, @pnowojski
> <pn...@apache.org> , Yuan Mei and probably
> @Yu Li <ca...@gmail.com>  will be able to join at different stages.
>
> Regarding using only the confirmed checkpoints, you are right: JM can
> abort non-confirmed checkpoints and discard the state. FLIP-158 has
> the same problem because StateChangelog produces StateHandles that
>  can be discarded by the JM. Currently, potentially discarded
> changes are re-uploaded in both FLIPs.
>
> In FLIP-158 (or follow-up), I planned to improve this part by:
> 1. Limiting max-concurrent-checkpoints to 1, and
> 2. Sending the last confirmed checkpoint ID in RPCs and barriers
> So at the time of checkpoint, backend knows exactly which changes can be
> included.
>
> Handling of removed keys is not related to the aborted checkpoints. They
> are
> needed on recovery to actually remove data from the previous snapshot.
> In FLIP-158 it is again similar: ChangelogStateBackend has to encode
> removal operations and send them to StateChangelog (though no additional
> data structure is required).
>
> Regards,
> Roman
>
>
> On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen <se...@apache.org> wrote:
>
> > Thanks, Roman for publishing this design.
> >
> > There seems to be quite a bit of overlap with FLIP-158 (generalized
> > incremental checkpoints).
> >
> > I would go with +1 to the effort if it is a pretty self-contained and
> > closed effort. Meaning we don't expect that this needs a ton of
> follow-ups,
> > other than common maintenance and small bug fixes. If we expect that this
> > requires a lot of follow-ups, then we end up splitting our work between
> > this FLIP and FLIP-158, which seems a bit inefficient.
> > What other committers would be involved to ensure the community can
> > maintain this?
> >
> >
> > The design looks fine, in general, with one question:
> >
> > When persisting changes, you persist all changes that have a newer
> version
> > than the latest one confirmed by the JM.
> >
> > Can you explain why it is like that exactly? Alternatively, you could
> keep
> > the latest checkpoint ID for which the state backend persisted the diff
> > successfully to the checkpoint storage, and created a state handle. For
> > each checkpoint, the state backend includes the state handles of all
> > involved chunks. That would be similar to the log-based approach in
> > FLIP-158.
> >
> > I have a suspicion that this is because the JM may have released the
> state
> > handle (and discarded the diff) for a checkpoint that succeeded on the
> task
> > but didn't succeed globally. So we cannot reference any state handle that
> > has been handed over to the JobManager, but is not yet confirmed.
> >
> > This characteristic seems to be at the heart of much of the complexity,
> > also the handling of removed keys seems to be caused by that.
> > If we could change that assumption, the design would become simpler.
> >
> > (Side note: I am wondering if this also impacts the FLIP-158 DSTL
> design.)
> >
> > Best,
> > Stephan
> >
> >
> > On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman <
> > khachatryan.roman@gmail.com> wrote:
> >
> > > Hi Stefan,
> > >
> > > Thanks for your reply. Very interesting ideas!
> > > If I understand correctly, SharedStateRegistry will still be
> responsible
> > > for pruning the old state; for that, it will maintain some (ordered)
> > > mapping between StateMaps and their versions, per key group.
> > > I think one modification to this approach is needed to support
> > journaling:
> > > for each entry, maintain a version when it was last fully snapshotted;
> > and
> > > use this version to find the minimum as you described above.
> > > I'm considering a better state cleanup and optimization of removals as
> > the
> > > next step. Anyway, I will add it to the FLIP document.
> > >
> > > Thanks!
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter <
> > stefanrichter83@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Very happy to see that the incremental checkpoint idea is finally
> > > becoming
> > > > a reality for the heap backend! Overall the proposal looks pretty
> good
> > to
> > > > me. Just wanted to point out one possible improvement from what I can
> > > still
> > > > remember from my ideas back then: I think you can avoid doing
> periodic
> > > full
> > > > snapshots for consolidation. Instead, my suggestion would be to track
> > the
> > > > version numbers you encounter while you iterate a snapshot for
> writing
> > > it -
> > > > and then you should be able to prune all incremental snapshots that
> > were
> > > > performed with a version number smaller than the minimum you find. To
> > > avoid
> > > > the problem of very old entries that never get modified you could
> start
> > > > spilling entries with a certain age-difference compared to the
> current
> > > map
> > > > version so that eventually all entries for an old version are
> > re-written
> > > to
> > > > newer snapshots. You can track the version up to which this was done
> in
> > > the
> > > > map and then you can again let go of their corresponding snapshots
> > after
> > > a
> > > > guaranteed time.So instead of having the burden of periodic large
> > > > snapshots, you can make every snapshot work a little bit on the
> cleanup
> > > and
> > > > if you are lucky it might happen mostly by itself if most entries are
> > > > frequently updated. I would also consider to make map clean a special
> > > event
> > > > in your log and consider unticking the versions on this event - this
> > > allows
> > > > you to let go of old snapshots and saves you from writing a log of
> > > > antimatter entries. Maybe the ideas are still useful to you.
> > > >
> > > > Best,
> > > > Stefan
> > > >
> > > > On 2020/11/04 01:54:25, Khachatryan Roman <k....@gmail.com> wrote:
> > > > > Hi devs,>
> > > > >
> > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots
> > for>
> > > > > heap-based state backend [1]>
> > > > >
> > > > > Heap backend, while being limited state sizes fitting into memory,
> > also
> > > > has>
> > > > > some advantages compared to RocksDB backend:>
> > > > > 1. Serialization once per checkpoint, not per state modification.
> > This>
> > > > > allows to “squash” updates to the same keys>
> > > > > 2. Shorter synchronous phase (compared to RocksDB incremental)>
> > > > > 3. No need for sorting and compaction, no IO amplification and JNI
> > > > overhead>
> > > > > This can potentially give higher throughput and efficiency.>
> > > > >
> > > > > However, Heap backend currently lacks incremental checkpoints. This
> > > > FLIP>
> > > > > aims to add initial support for them.>
> > > > >
> > > > > [1]>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend
> > > >
> > > >
> > > > >
> > > > >
> > > > > Any feedback highly appreciated.>
> > > > >
> > > > > Regards,>
> > > > > Roman>
> > > > >
> > >
> >
>

Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

Posted by Roman Khachatryan <ro...@apache.org>.
Thanks for your reply Stephan.

Yes, there is overlap between FLIP-151 and FLIP-158 as both
address incremental state updates. However, I think that FLIP-151 on top
of FLIP-158 increases efficiency by:

1. "Squashing" the changes made to the same key. For example, if some
counter was changed 10 times then FLIP-151 will send only the last value
(this allows to send AND store less data compared to FLIP-158)

2. Keeping in memory only the changed keys and not the values.
(this allows to reduce memory AND latency (caused by serialization +
copying on every update) compared to FLIP-158)

(1) can probably be implemented in FLIP-158, but not (2).

I don't think there will be a lot of follow-up efforts and I hope
@Dawid Wysakowicz <dw...@apache.org>, @pnowojski
<pn...@apache.org> , Yuan Mei and probably
@Yu Li <ca...@gmail.com>  will be able to join at different stages.

Regarding using only the confirmed checkpoints, you are right: JM can
abort non-confirmed checkpoints and discard the state. FLIP-158 has
the same problem because StateChangelog produces StateHandles that
 can be discarded by the JM. Currently, potentially discarded
changes are re-uploaded in both FLIPs.

In FLIP-158 (or follow-up), I planned to improve this part by:
1. Limiting max-concurrent-checkpoints to 1, and
2. Sending the last confirmed checkpoint ID in RPCs and barriers
So at the time of checkpoint, backend knows exactly which changes can be
included.

Handling of removed keys is not related to the aborted checkpoints. They are
needed on recovery to actually remove data from the previous snapshot.
In FLIP-158 it is again similar: ChangelogStateBackend has to encode
removal operations and send them to StateChangelog (though no additional
data structure is required).

Regards,
Roman


On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen <se...@apache.org> wrote:

> Thanks, Roman for publishing this design.
>
> There seems to be quite a bit of overlap with FLIP-158 (generalized
> incremental checkpoints).
>
> I would go with +1 to the effort if it is a pretty self-contained and
> closed effort. Meaning we don't expect that this needs a ton of follow-ups,
> other than common maintenance and small bug fixes. If we expect that this
> requires a lot of follow-ups, then we end up splitting our work between
> this FLIP and FLIP-158, which seems a bit inefficient.
> What other committers would be involved to ensure the community can
> maintain this?
>
>
> The design looks fine, in general, with one question:
>
> When persisting changes, you persist all changes that have a newer version
> than the latest one confirmed by the JM.
>
> Can you explain why it is like that exactly? Alternatively, you could keep
> the latest checkpoint ID for which the state backend persisted the diff
> successfully to the checkpoint storage, and created a state handle. For
> each checkpoint, the state backend includes the state handles of all
> involved chunks. That would be similar to the log-based approach in
> FLIP-158.
>
> I have a suspicion that this is because the JM may have released the state
> handle (and discarded the diff) for a checkpoint that succeeded on the task
> but didn't succeed globally. So we cannot reference any state handle that
> has been handed over to the JobManager, but is not yet confirmed.
>
> This characteristic seems to be at the heart of much of the complexity,
> also the handling of removed keys seems to be caused by that.
> If we could change that assumption, the design would become simpler.
>
> (Side note: I am wondering if this also impacts the FLIP-158 DSTL design.)
>
> Best,
> Stephan
>
>
> On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
> > Hi Stefan,
> >
> > Thanks for your reply. Very interesting ideas!
> > If I understand correctly, SharedStateRegistry will still be responsible
> > for pruning the old state; for that, it will maintain some (ordered)
> > mapping between StateMaps and their versions, per key group.
> > I think one modification to this approach is needed to support
> journaling:
> > for each entry, maintain a version when it was last fully snapshotted;
> and
> > use this version to find the minimum as you described above.
> > I'm considering a better state cleanup and optimization of removals as
> the
> > next step. Anyway, I will add it to the FLIP document.
> >
> > Thanks!
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter <
> stefanrichter83@gmail.com
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > Very happy to see that the incremental checkpoint idea is finally
> > becoming
> > > a reality for the heap backend! Overall the proposal looks pretty good
> to
> > > me. Just wanted to point out one possible improvement from what I can
> > still
> > > remember from my ideas back then: I think you can avoid doing periodic
> > full
> > > snapshots for consolidation. Instead, my suggestion would be to track
> the
> > > version numbers you encounter while you iterate a snapshot for writing
> > it -
> > > and then you should be able to prune all incremental snapshots that
> were
> > > performed with a version number smaller than the minimum you find. To
> > avoid
> > > the problem of very old entries that never get modified you could start
> > > spilling entries with a certain age-difference compared to the current
> > map
> > > version so that eventually all entries for an old version are
> re-written
> > to
> > > newer snapshots. You can track the version up to which this was done in
> > the
> > > map and then you can again let go of their corresponding snapshots
> after
> > a
> > > guaranteed time.So instead of having the burden of periodic large
> > > snapshots, you can make every snapshot work a little bit on the cleanup
> > and
> > > if you are lucky it might happen mostly by itself if most entries are
> > > frequently updated. I would also consider to make map clean a special
> > event
> > > in your log and consider unticking the versions on this event - this
> > allows
> > > you to let go of old snapshots and saves you from writing a log of
> > > antimatter entries. Maybe the ideas are still useful to you.
> > >
> > > Best,
> > > Stefan
> > >
> > > On 2020/11/04 01:54:25, Khachatryan Roman <k....@gmail.com> wrote:
> > > > Hi devs,>
> > > >
> > > > I'd like to start a discussion of FLIP-151: Incremental snapshots
> for>
> > > > heap-based state backend [1]>
> > > >
> > > > Heap backend, while being limited state sizes fitting into memory,
> also
> > > has>
> > > > some advantages compared to RocksDB backend:>
> > > > 1. Serialization once per checkpoint, not per state modification.
> This>
> > > > allows to “squash” updates to the same keys>
> > > > 2. Shorter synchronous phase (compared to RocksDB incremental)>
> > > > 3. No need for sorting and compaction, no IO amplification and JNI
> > > overhead>
> > > > This can potentially give higher throughput and efficiency.>
> > > >
> > > > However, Heap backend currently lacks incremental checkpoints. This
> > > FLIP>
> > > > aims to add initial support for them.>
> > > >
> > > > [1]>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend
> > >
> > >
> > > >
> > > >
> > > > Any feedback highly appreciated.>
> > > >
> > > > Regards,>
> > > > Roman>
> > > >
> >
>