You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/12/04 13:05:11 UTC

[jira] [Commented] (FLINK-2976) Save and load checkpoints manually

    [ https://issues.apache.org/jira/browse/FLINK-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15041493#comment-15041493 ] 

ASF GitHub Bot commented on FLINK-2976:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1434

    [FLINK-2976] Allow to trigger checkpoints manually

    This PR contains **documentation**: https://github.com/uce/flink/blob/2976-savepoints/docs/apis/savepoints.md
    
    **In a nutshell**, savepoints `(*)` are **manually triggered checkpoints**, which take a snapshot of a program and write it out to an external state backend. This allows you to stop and resume your program without loosing intermediate state.
    
    **Why is this nice?** Because you don't have to replay everything when you redeploy your long running streaming job after changing it or updating to a newer Flink version.
    
    `(*)` Initially I wrote it as sa**F**epoints, but then settled on sa**V**epoints after stubmling across a related feature in [an Oracle SQL reference](https://docs.oracle.com/cd/B19306_01/appdev.102/b14261/savepoint_statement.htm). What do you think? :smile: http://doodle.com/poll/2z2cp9hxu7eucdsz
    
    ## Example
    
    Start your stateful streaming program via `flink/bin run ...`.
    
    ```
    $ bin/flink list
    ------------------ Running/Restarting Jobs -------------------
    04.12.2015 13:51:10 : 46da86f25ca8daa1bbff8ccae64d53af : Flink Streaming Job (RUNNING)
    --------------------------------------------------------------
    ```
    
    Wait for some checkpoints to complete:
    
    ```
    $ tail -f log/flink-hadoop-client-uce-m.log
    ...
    13:50:59,806 INFO  org.apache.flink.runtime.jobmanager.JobManager - Status of job 46da86f25ca8daa1bbff8ccae64d53af (Flink Streaming Job) changed to RUNNING.
    ...
    13:55:37,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1449150937225
    13:55:37,581 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1
    13:55:42,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1449150942225
    13:55:42,328 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2
    ...
    13:56:27,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 11 @ 1449150987225
    13:56:27,237 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 11
    ...
    ```
    
    Trigger a savepoint and cancel the job:
    
    ```
    $ bin/flink savepoint 46da86f25ca8daa1bbff8ccae64d53af
    Triggering savepoint for job 46da86f25ca8daa1bbff8ccae64d53af. Waiting for response...
    Savepoint completed. Path: jobmanager://savepoints/1
    You can resume your program from this savepoint with the run command.
    
    $ bin/flink cancel 46da86f25ca8daa1bbff8ccae64d53af
    ```
    
    Now you can restart the program from the savepoint:
    
    ```
    $ bin/flink run --fromSavepoint jobmanager://savepoints/1 ...
    ```
    
    This will resume the application from the state of the savepoint.
    
    ## Changes to Flink
    
    I focussed on **not changing any major Flink component** for this. Savepoints use the same checkpointing mechanism as the periodic checkpoints with some plumbing around it.
    
    ### Savepoint coordinator
    
    In addition to the `CheckpointCoordinator`, we add another instance of the `CheckpointCoordinator` called `SavepointCoordinator`. This class extends the regular coordinator and registers some callbacks on shutdown, fully ack'ed checkpoint, and cancelled checkpoint. For this, I've added three callback methods to the checkpoint coordinator, which are overwritten by the savepoint coordinator. With two separate coordinators, periodic checkpoints and savepoints don't interfere with each other.
    
    The savepoint coordinator manages a map of `checkpoint ID => future`. The futures are completed when the checkpoint is ack'ed or cancelled (or the coordinator shuts down altogether).
    
    #### Restoring
    
    Restore happens on job submission if a savepoint path is provided in the `JobSnapshottingSettings`. The restore mechanism is similar to the regular checkpoint restore, but with some further sanity checks to ensure that the state to task mapping is correct (see below). All state has to be mapped to the restored program.
    
    ### JobManagerMessages
    
    Added `TriggerSavepoint(JobID)` and `DisposeSavepoint(String)` Akka messages to the job manager. They trigger and dispose the savepoints respectively. These operations work asynchronously and respond the the request when the savepoint futures complete. The requests are triggered by the user (see CLI frontend).
     
    ### Hashing of StreamNodes
    
    The state to task mapping of checkpoints happens via `(jobVertexID, subtaskIndex)`. With this change, the jobVertexIDs of streaming programs are generated deterministically with respect to the structure of the program. This is needed to make sure that a restore with a new program can map the savepoint state to the tasks.
    
    The hash starts from the sources and takes multiple things into account:
    - parallelism
    - user function class
    - hash of the input
    - hash of the outputs
    - stream node ID
    
    The automatic generations makes sure that you can just use the savepoints, but it is actually *not recommended*, because you cannot change the program in any meaningful way (except changing the user function internals).
    
    That's why the **recommended option** is to specify a unique ID as input to the hasher on the DataStream API level:
    
    ```
    DataStream<String> stream = env.
      // Stateful source (e.g. Kafka) with ID
      .addSource(new StatefulSource()).uid("source-id")
      .shuffle()
      // The stateful mapper with ID
      .map(new StatefulMapper()).uid("mapper-id")
    
    // Stateless sink (no specific ID required)
    stream.print()
    ```
    
    If you give IDs to all stateful operators, you can happily re-arrange and change the topology (except for parallelism, see below).
    
    ### Application ID and DbStateBackend
    
    Savepoints are pairs of `(ApplicationID, CompletedCheckpoint)`. I've added a new `ApplicationID` to allow scoping tasks across multiple job submissions (which have changing job IDs). This is for example required to restore state in the `DbStateBackend`. After consulting with @gyfora I've changed all references to `JobID` in `DbStateBackend` to  `ApplicationID`.
    
    The ApplicationID is assigned in the `ExecutionGraph` only and is reset to the application ID of the savepoint if there is a restore operation. This touches almost nothing of the existing code and it is only propagated to the `TaskDeploymentDescriptor` and `Environment` of the `Task` instances.
    
    ### State storage
    
    The state storage **does not** instantiate the regular state backend on the job manager. It is essentially a set of a few helper classes, which allow to put and get some state to the file system or the job manager's heap. I think this is fine for now, because I didn't want to make changes to the central state abstractions, which are kind of in flux right now. But we should think about it in the future.
    
    ### Configuration and CLIFrontend
    
    This works out of the box if the job is using checkpointing. The default state backend for savepoints is `jobmanager`, which allows to stop and restore a program while the same job manager is running.
    
    For configuration, there are two new keys:
    ```
    state.backend.savepoints
    state.backend.savepoints.fs.dir
    ```
    If you don't specify these, the regular state backend configuration is used with the `jobmanager` as a fallback if no viable config is found.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 2976-savepoints

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1434.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1434
    
----
commit d63ea457d11c89378c4d0f0173a5ac372b5a3f58
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T16:31:32Z

    [FLINK-2976] [streaming-java, streaming-scala] Set JobVertexID based on stream node hash

commit 4896bdcb0107059b2e0f57afc5d7776c26b820d7
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T16:51:44Z

    [FLINK-2976] [runtime] Add StateStore<T>

commit c748ac935edcd97a1cd7c49662420f55c9806354
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:33:31Z

    [FLINK-2976] [core, runtime, streaming-java] Add ApplicationID to ExecutionGraph

commit 6b20f6924df29c56783b0c0772a61a05639ef619
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:38:34Z

    [FLINK-2976] [runtime] Add setCount(long newCount) to CheckpointIDCounter

commit 60a0774133f460627a0d9949219299b1875d3c1f
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:47:07Z

    [FLINK-2976] [runtime, tests] Add SavepointCoordinator

commit 182b157cc92b09cdc5ce2867a4dd5cbc234a385d
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-01T17:49:14Z

    [FLINK-2976] [clients] Add savepoint commands to CliFrontend

commit a69c550967da3acb77ae2c8b5cef2982e835e6b4
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-02T15:21:06Z

    [FLINK-2976] [docs] Add docs about savepoints

commit b38481fe147a470127a045eb11edae3af198c134
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-12-03T10:35:46Z

    [FLINK-2976] [streaming-contrib] Use ApplicationID in DbStateBackend instead of JobID

----


> Save and load checkpoints manually
> ----------------------------------
>
>                 Key: FLINK-2976
>                 URL: https://issues.apache.org/jira/browse/FLINK-2976
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>    Affects Versions: 0.10.0
>            Reporter: Ufuk Celebi
>             Fix For: 1.0.0
>
>
> Currently, all checkpointed state is bound to a job. After the job finishes all state is lost. In case of an HA cluster, jobs can live longer than the cluster, but they still suffer from the same issue when they finish.
> Multiple users have requested the feature to manually save a checkpoint in order to resume from it at a later point. This is especially important for production environments. As an example, consider upgrading your existing production Flink program. Currently, you loose all the state of your program. With the proposed mechanism, it will be possible to save a checkpoint, stop and update your program, and then continue your program with the  checkpoint.
> The required operations can be simple:
> saveCheckpoint(JobID) => checkpointID: long
> loadCheckpoint(JobID, long) => void
> For the initial version, I would apply the following restriction:
> - The topology needs to stay the same (JobGraph parallelism, etc.)
> A user can configure this behaviour via the environment like the checkpointing interval. Furthermore, the user can trigger the save operation via the command line at arbitrary times and load a checkpoint when submitting a job, e.g.
> bin/flink checkpoint <JobID> => checkpointID: long 
> and
> bin/flink run --loadCheckpoint JobID [latest saved checkpoint]
> bin/flink run --loadCheckpoint (JobID,long) [specific saved checkpoint]
> As far as I can tell, the required mechanisms are similar to the ones implemented for JobManager high availability. We need to make sure to persist the CompletedCheckpoint instances as a pointer to the checkpoint state and to *not* remove saved checkpoint state.
> On the client side, we need to give the job and its vertices the same IDs to allow mapping the checkpoint state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)