You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/01 10:55:00 UTC

[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

    [ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348381#comment-16348381 ] 

ASF GitHub Bot commented on FLINK-5820:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5396#discussion_r165295768
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java ---
    @@ -18,45 +18,50 @@
     
     package org.apache.flink.runtime.checkpoint;
     
    +import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.runtime.jobgraph.JobStatus;
     
     import java.io.Serializable;
     
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
     /**
    - * The configuration of a checkpoint, such as whether
    + * The configuration of a checkpoint. This described whether
      * <ul>
    - *     <li>The checkpoint should be persisted</li>
    - *     <li>The checkpoint must be full, or may be incremental (not yet implemented)</li>
    - *     <li>The checkpoint format must be the common (cross backend) format,
    - *     or may be state-backend specific (not yet implemented)</li>
    - *     <li>when the checkpoint should be garbage collected</li>
    + *     <li>The checkpoint is s regular checkpoint or a savepoint</li>
    + *     <li>When the checkpoint should be garbage collected</li>
      * </ul>
      */
     public class CheckpointProperties implements Serializable {
     
    -	private static final long serialVersionUID = -8835900655844879470L;
    +	private static final long serialVersionUID = 2L;
     
    -	private final boolean forced;
    +	/** Type - checkpoit / savepoint. */
    --- End diff --
    
    nit: typo


> Extend State Backend Abstraction to support Global Cleanup Hooks
> ----------------------------------------------------------------
>
>                 Key: FLINK-5820
>                 URL: https://issues.apache.org/jira/browse/FLINK-5820
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of state is only meaningful in the context of its state handle. There is no possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a TaskManager hands over a state handle to the JobManager and either of them has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state handle is individually released. For large jobs, this means 1000s of release operations (typically file deletes) per checkpoint, which can be expensive on some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, not the "completed checkpoint store". The later only stores the pointers to the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed state that belongs to only one specific checkpoint (shared state comes as part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state currently (transitively for RocksDB as well), this means a re-structuring of the storage directories as follows:
> {code}
> ../<flink-checkpoints>/job1-id/
>                               /shared/    <-- shared checkpoint data
>                               /chk-1/...  <-- data exclusive to checkpoint 1
>                               /chk-2/...  <-- data exclusive to checkpoint 2
>                               /chk-3/...  <-- data exclusive to checkpoint 3
> ../<flink-checkpoints>/job2-id/
>                               /shared/...
>                               /chk-1/...
>                               /chk-2/...
>                               /chk-3/...
> ../<flink-savepoints>/savepoint-1/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
>                      /savepoint-2/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)