You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Roman Khachatryan (Jira)" <ji...@apache.org> on 2021/10/21 12:04:00 UTC

[jira] [Created] (FLINK-24611) Prevent JM from discarding state on checkpoint abortion

Roman Khachatryan created FLINK-24611:
-----------------------------------------

             Summary: Prevent JM from discarding state on checkpoint abortion
                 Key: FLINK-24611
                 URL: https://issues.apache.org/jira/browse/FLINK-24611
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Checkpointing
    Affects Versions: 1.15.0
            Reporter: Roman Khachatryan
             Fix For: 1.15.0


When a checkpoint is aborted, JM discards any state that was sent to it and wasn't used in other checkpoints. This forces incremental state backends to wait for confirmation from JM before re-using this state. For changelog backend this is even more critical.


 One approach proposed was to make backends/TMs responsible for their state, until it's not shared with other TMs, i.e. until rescaling (private/shared state ownership track).
 However, that approach is quite invasive.

An alternative solution would be:
 1. SharedStateRegistry remembers the latest checkpoint for each shared state (instead of usage count currently)
 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on subsumption)
 3. SharedStateRegistry then discards any state associated with the lower (subsumed/aborted) checkpoints
 So the aborted checkpoint can only be discarded after some subsequent successful checkpoint (which can mark state as used).

Only JM code is changed.

 

Implementation considerations.

On subsumption, JM needs to find all the unused state and discard it.
 This can either be done by
 1) simply traversing all entries; or by 
 2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, List<K>>). This allows to skip unnecessary traversal at the cost of higher memory usage

In both cases:
 - each entry stores last checkpoint ID it was used in (long)
 - key is hashed (even with plain traversal, map.entrySet.iterator.remove() computes hash internally)

Given the following constraints:
 - 10M state entries at most
 - 10 (retained) checkpoint at most
 - 10 checkpoints per second at most
 - state entry key takes 32B (usually UUID or two UUIDs)

The extra space for (2) would be in order of 10M*32B=38Mb.
 The extra time for (1) would be in order of 10M * 10 checkpoints per second * ratio of outdated entries per checkpoint. Depending on the ratio and the hardware, this could take up to hundreds of ms per second, blocking the main thread.
 So approach (2) seems reasonable.

The following cases shouldn't pose any difficulties:
 1. Recovery, re-scaling, and state used by not all or by no tasks - we still register all states on recovery even after FLINK-22483/FLINK-24086
 2. PlaceholderStreamStateHandles
 3. Cross-task state sharing - not an issue as long as everything is managed by JM
 4. Dependencies between SharedStateRegistry and CompletedCheckpointStore - simple after FLINK-24086

The following should be kept in mind:
 1. On job cancellation, state of aborted checkpoints should be cleaned up explicitly
 2. Savepoints should be ignored and not change CheckpointStore.lowestCheckpointID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)