You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/02/17 13:56:41 UTC

[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

    [ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871885#comment-15871885 ] 

ASF GitHub Bot commented on FLINK-5820:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/3346

    Selfcontained

    This is based on #3345 and only the last two commits are relevant here. I've separated the test changes (last commit) and the main changes (2nd last commit) for better reviewability. I would squash them before merging this. The change is looks more involved than it actually is. It is mostly routing new information with the checkpointing barriers, which touches a lot of places.
    
    The main change is to add `CheckpointOptions` to the triggered checkpoint messages (coordinator to barrier injecting tasks) and barriers (flowing inline with the data):
    
    ```java
    public class CheckpointOptions {
    	
    	// Type of checkpoint
    	// => FULL_CHECKPOINT
    	// => SAVEPOINT
    	@NonNull
    	CheckpointType getCheckpointType();
    
    	// Custom target location. This is a String, because for future
    	// backends it can be a logical location like a DB table.
    	@Nullable
    	String getTargetLocation();
    
    }
    ```
    
    This class would be the place to define more options for performing the checkpoints (for example for incremental checkpoints). @StephanEwen was involved with the design of incremental checkpoints and could probably comment best whether this is inline with the design for that.
    
    These options are forwarded via the `StreamTask` to the `StreamOperator`s and `Snapshotable` backends. The `AbstractStreamOperator` checks the options and either
    i) forwards the shared per operator `CheckpointStreamFactory` (as of #3312), or
    ii) creates a custom savepoint stream factory (one per savepoint).
    
    For this, the state backends provide the following new method:
    
    ```java
    CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
    ```
    
    The `MemoryStateBackend` returns the regular stream factory and the `FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all checkpoint streams to a single directory (instead of the regular sub folders per checkpoint).
    
    We end up with the following directory layout for savepoints:
    
    ```
    +---------------------------+
    | :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`)
    +---------------------------+
      | +---------------------------------------+
      +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
        +---------------------------------------+
          |
          +- _metadata (one per savepoint)
          +- :uuid (one data file per StreamTask)
          +- ...
          +- :uuid
    ```
    
    I decided to include a prefix of the job ID to the savepoint directory, because I think that this could be helpful to map savepoints to jobs, which is a manual task. 
    
    It's important to make sure that this is inline with upcoming changes for incremental checkpoints (discussion on mailing list) and FLINK-5820.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink selfcontained

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3346.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3346
    
----
commit 64e1f8892597dd26489774fe494fd7b211d789a9
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-02-15T16:52:40Z

    [FLINK-5763] [checkpoints] Acknowledge with explicit ID and CheckpointMetrics
    
    Instead of acknowledging checkpoints with the CheckpointMetaData make
    the acknowledgement explicit by ID and CheckpointMetrics. The rest is
    not needed.

commit 0b8fa02a150ae6d5891e04d1fce6de748a283aaf
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-02-15T17:16:44Z

    [FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData

commit d023159d0fef1da24373d7880e11d0a36afbd7ef
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-02-16T15:52:32Z

    [FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties

commit 2496068371bf1aac9e2ce6223002c1d9a043930f
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-02-16T16:56:23Z

    [FLINK-5763] [checkpoints] Add CheckpointOptions
    
    Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
    to barrier injecting tasks) and barriers (flowing inline with the data:
    
    ```java
    public class CheckpointOptions {
    
      // Type of checkpoint
      // => FULL_CHECKPOINT
      // => SAVEPOINT
      @NonNull
      CheckpointType getCheckpointType();
    
      // Custom target location. This is a String, because for future
      // backends it can be a logical location like a DB table.
      @Nullable
      String getTargetLocation();
    
    }
    ```
    
    This class would be the place to define more options for performing the
    checkpoints (for example for incremental checkpoints).
    
    These options are forwarded via the `StreamTask` to the `StreamOperator`s and
    `Snapshotable` backends. The `AbstractStreamOperator` checks the options and
    either i) forwards the shared per operator `CheckpointStreamFactory` (as of
    
    For this, the state backends provide the following new method:
    
    ```
    CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
    ```
    
    The `MemoryStateBackend` returns the regular stream factory and the
    `FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
    checkpoint streams to a single directory (instead of the regular sub folders
    per checkpoint).
    
    We end up with the following directory layout for savepoints:
    
    ```
    +---------------------------+
    | :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`)
    +---------------------------+
      | +---------------------------------------+
      +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
        +---------------------------------------+
           |
           +- _metadata (one per savepoint)
           +- :uuid (one data file per StreamTask)
           +- ...
           +- :uuid
    ```

commit ca80d68ab7faba663f4c384156a90d58de0ebf82
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-02-16T16:56:37Z

    [FLINK-5763] [checkpoints] Adjust tests

----


> Extend State Backend Abstraction to support Global Cleanup Hooks
> ----------------------------------------------------------------
>
>                 Key: FLINK-5820
>                 URL: https://issues.apache.org/jira/browse/FLINK-5820
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> The current state backend abstraction has the limitation that each piece of state is only meaningful in the context of its state handle. There is no possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a TaskManager hands over a state handle to the JobManager and either of them has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state handle is individually released. For large jobs, this means 1000s of release operations (typically file deletes) per checkpoint, which can be expensive on some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, not the "completed checkpoint store". The later only stores the pointers to the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed state that belongs to only one specific checkpoint (shared state comes as part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state currently (transitively for RocksDB as well), this means a re-structuring of the storage directories as follows:
> {code}
> ../<flink-checkpoints>/job1-id/
>                               /shared/    <-- shared checkpoint data
>                               /chk-1/...  <-- data exclusive to checkpoint 1
>                               /chk-2/...  <-- data exclusive to checkpoint 2
>                               /chk-3/...  <-- data exclusive to checkpoint 3
> ../<flink-checkpoints>/job2-id/
>                               /shared/...
>                               /chk-1/...
>                               /chk-2/...
>                               /chk-3/...
> ../<flink-savepoints>/savepoint-1/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
>                      /savepoint-2/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)