You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (Jira)" <ji...@apache.org> on 2021/10/28 15:59:00 UTC

[jira] [Comment Edited] (FLINK-24611) Prevent JM from discarding state on checkpoint abortion

    [ https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17435505#comment-17435505 ] 

Stephan Ewen edited comment on FLINK-24611 at 10/28/21, 3:58 PM:
-----------------------------------------------------------------

+1 to the general idea of changing the state registry to not count the references, but track the latest reference.
 I think this is more robust all in all because it means we do not have to ensure we keep the increments and decrements strictly in sync.


Regarding the organization of the shared state registry, I would *advocate for option (1) (traversing the entries)* for multiple reasons:
 # I think the math above is ignoring the maintenance of the secondary index (by checkpoint ID).

(1) Every time a state entry is changed, you will need to remove it from the list of the previous checkpoint ID and add it to the list of the latest checkpoint ID. For that, you probably want a {{Set}} of state handles per checkpoint ID, not a {{List}}, so you can do efficient removes objects. If you add up that cost of changing the sets for each object update, you fund that it becomes quite expensive if we expect to carry the majority of state artifacts over to the next checkpoint every time.

I think that is a fair assumption to make (that we carry over the majority of artifacts), because if we aim for the very frequent checkpoint intervals, the RocksDB artifacts and a significant number of changelog artifacts will just have to go forward. So we end up touching many state entries anyways, and not just reading them, but also modifying two sets each time. Add to that maybe one or two set re-hash operations, and you end up touching more elements than the total number of elements (because several are touched multiple times).


(2) We can put the traversal and disposal into an async thread (in the I/O pool). That should at least unblock the JM/Scheduler thread.

In an initial version, we might still want to lock the map against concurrent modifications. But in the future, this could be even fully asynchronous.

Once the ID-to-be-eviceted is fixed, the pass to find entries with a checkpoint reference at most of that ID can be done asynchronously. There is an issue about concurrent modifications of the map, but this could be worked around, because the deletion pass if ok to miss some artifacts (which would be deleted then in the next pass). That way, we can completely decouple the incremental state release from the synchronized checkpointing work. Modulo maintaining the current backpressure from state disposal.

I would consider the fully async variant an optimization, though.


(3) We can also batch the passes over the state registry together in a future optimization, if we see individual passes consume to much. For example, we could say that one pass happens every second (scheduled by the timer thread) and this pass would then discard anything lower than the latest evicted checkpoint.


was (Author: stephanewen):
+1 to the general idea of changing the state registry to not count the references, but track the latest reference.
 I think this is more robust all in all because it means we do not have to ensure we keep the increments and decrements strictly in sync.

 

Regarding the organization of the shared state registry, I would *advocate for option (1) (traversing the entries)* for multiple reasons:
 # I think the math above is ignoring the maintenance of the secondary index (by checkpoint ID).

Every time a state entry is changed, you will need to remove it from the list of the previous checkpoint ID and add it to the list of the latest checkpoint ID. For that, you probably want a {{Set}} of state handles per checkpoint ID, not a {{List}}, so you can do efficient removes objects. If you add up that cost of changing the sets for each object update, you fund that it becomes quite expensive if we expect to carry the majority of state artifacts over to the next checkpoint every time.

I think that is a fair assumption to make (that we carry over the majority of artifacts), because if we aim for the very frequent checkpoint intervals, the RocksDB artifacts and a significant number of changelog artifacts will just have to go forward. So we end up touching many state entries anyways, and not just reading them, but also modifying two sets each time. Add to that maybe one or two set re-hash operations, and you end up touching more elements than the total number of elements (because several are touched multiple times).
 # We can put the traversal and disposal into an async thread (in the I/O pool). That should at least unblock the JM/Scheduler thread.

In an initial version, we might still want to lock the map against concurrent modifications. But in the future, this could be even fully asynchronous.

Once the ID-to-be-eviceted is fixed, the pass to find entries with a checkpoint reference at most of that ID can be done asynchronously. There is an issue about concurrent modifications of the map, but this could be worked around, because the deletion pass if ok to miss some artifacts (which would be deleted then in the next pass). That way, we can completely decouple the incremental state release from the synchronized checkpointing work. Modulo maintaining the current backpressure from state disposal.

I would consider the fully async variant an optimization, though.


 # We can also batch the passes over the state registry together in a future optimization, if we see individual passes consume to much. For example, we could say that one pass happens every second (scheduled by the timer thread) and this pass would then discard anything lower than the latest evicted checkpoint.

> Prevent JM from discarding state on checkpoint abortion
> -------------------------------------------------------
>
>                 Key: FLINK-24611
>                 URL: https://issues.apache.org/jira/browse/FLINK-24611
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> When a checkpoint is aborted, JM discards any state that was sent to it and wasn't used in other checkpoints. This forces incremental state backends to wait for confirmation from JM before re-using this state. For changelog backend this is even more critical.
> One approach proposed was to make backends/TMs responsible for their state, until it's not shared with other TMs, i.e. until rescaling (private/shared state ownership track: FLINK-23342 and more).
>  However, that approach is quite invasive.
>  
> An alternative solution would be:
>  1. SharedStateRegistry remembers the latest checkpoint for each shared state (instead of usage count currently)
>  2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on subsumption)
>  3. SharedStateRegistry then discards any state associated with the lower (subsumed/aborted) checkpoints
>  So the aborted checkpoint can only be discarded after some subsequent successful checkpoint (which can mark state as used).
> Only JM code is changed.
>  
> Implementation considerations.
> On subsumption, JM needs to find all the unused state and discard it.
>  This can either be done by
>  1) simply traversing all entries; or by 
>  2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, List<K>>). This allows to skip unnecessary traversal at the cost of higher memory usage
> In both cases:
>  - each entry stores last checkpoint ID it was used in (long)
>  - key is hashed (even with plain traversal, map.entrySet.iterator.remove() computes hash internally)
> Given the following constraints:
>  - 10M state entries at most
>  - 10 (retained) checkpoint at most
>  - 10 checkpoints per second at most
>  - state entry key takes 32B (usually UUID or two UUIDs)
> The extra space for (2) would be in order of 10M*32B=38Mb.
>  The extra time for (1) would be in order of 10M * 10 checkpoints per second * ratio of outdated entries per checkpoint. Depending on the ratio and the hardware, this could take up to hundreds of ms per second, blocking the main thread.
>  So approach (2) seems reasonable.
>  
> The following cases shouldn't pose any difficulties:
>  1. Recovery, re-scaling, and state used by not all or by no tasks - we still register all states on recovery even after FLINK-22483/FLINK-24086
>  2. PlaceholderStreamStateHandles
>  3. Cross-task state sharing - not an issue as long as everything is managed by JM
>  4. Dependencies between SharedStateRegistry and CompletedCheckpointStore - simple after FLINK-24086
>  5. Multiple concurrent checkpoints (below)
> Consider the following case:
> (nr. concurrent checkpoints > 1)
> 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets aborted - f1 is now tracked
> 2. savepoint 2 starts, it *will* use f1
> 3. checkpoint 3 starts and finishes; it does NOT use file f1
> When a checkpoint finishes, all pending checkpoints before it are aborted - but not savepoints.
> Savepoints currently are NOT incremental. And in the future, incremental savepoints shouldn't share any artifacts with checkpionts.
> The following should be kept in mind:
>  1. On job cancellation, state of aborted checkpoints should be cleaned up explicitly
>  2. Savepoints should be ignored and not change CheckpointStore.lowestCheckpointID
>  
> For the end users, this change might render as a delay in discarding state of aborted checkpoints, which seems acceptable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)