You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/09/21 09:09:04 UTC

[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

    [ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14900296#comment-14900296 ] 

ASF GitHub Bot commented on FLINK-2354:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1153

    [FLINK-2354] Add job graph and checkpoint recovery

    ## tl;dr
    
    This PR introduces `JobGraph` and `SuccessfulCheckpoint` recovery for submitted programs in case of JobManager failures.
    
    ## General Idea
    
    The general idea is to persist job graphs and successful checkpoints in ZooKeeper.
    
    We have introduced JobManager high availability via ZooKeeper in #1016. My PR builds on top of this and adds initial support for program recovery. We can recover both programs and successful checkpoints in case of a JobManager failure as soon as a standby job manager is granted leadership.
    
    ZooKeeper's sweet spot is rather small data (in KB range), but job graph and checkpoint state can grow larger. Therefore we don't directly persist the actual metadata, but use the state backend as a layer of indirection. We create state handles for the job graph and completed checkpoints and persist those. The state handle acts as a pointer to the actual data.
    
    At the moment, only the file system state backend is supported for this. The state handles need to be accessible from both task and job managers (e.g. a DFS).
    
    ## Configuration
    
    The minimal required configuration:
    
    ```bash
    recovery.mode: ZOOKEEPER
    ha.zookeeper.quorum: <ZooKeeper quroum peers>
    state.backend: FILESYSTEM
    state.backend.fs.dir.recovery: /path/to/recovery
    ```
    
    I don't like the current configuration keys. Until the next release, I would like a more consistent naming, e.g. prefix everything with `recovery.zookeeper`.
    
    ## ZooKeeper Nodes Overview
    
    Overview of ZNodes and components managing them:
    
    ```bash
    O- /flink
    |
    +----O /flink/jobgraphs (SubmittedJobGraphs)
    |    |
    |    +----O /flink/jobgraphs/<job-id>
    |
    +----O /flink/checkpoints  (CompletedCheckpoints)
    |    |
    |    +----O /flink/checkpoints/<job-id>
    |    .    |
    |    .    +----O /flink/checkpoints/<job-id>/1
    |    .    |
    |    .    +----O /flink/checkpoints/<job-id>/N
    |
    +----O /flink/checkpoint-counter (CheckpointIDCounter)
         |
         +----O /flink/checkpoints/<job-id>
    ```
    
    ## Implementation
    
    ### Submission vs. Recovery (JobManager and SubmittedJobGraphs)
    
    - `ZooKeeperSubmittedJobGraphs` manages `SubmittedJobGraph` state handles in ZooKeeper
    - Submission and recovery follow mostly the same code paths (see `JobManager#submitJob()`).
    - On (initial) submission:
      - After writing to ZooKeeper the JM checks synchronously whether she is still leader.
      - If not, the job is not scheduled for execution, but kept in ZooKeeper. Future leading JobManagers need to recover it. The client currently sees this as successful submission. The job is not removed in this case, because another job manager might recover between the write and remove. In such a case, a job would be running without being in ZooKeeper and without being acked to the client.
    - On recovery:
      - Recovery is triggered on granted leadership via the configured execution delay between retries.
      - All available jobs are scheduled for execution.
    - The ZNode for job graphs is monitored for modifications during operations. This way, a job manager can (eventually) detect if another job manager adds/removes a job and react to it.
    
    ### CompletedCheckpoints
    
    - `ZooKeeperCompletedCheckpoints` manages `SuccessfulCheckpoint` state handles in ZooKeeper (per job). Note that a `SuccessfulCheckpoint` has pointers to further state handles in most cases. In this case, we add another layer of indirection.
    - Every completed checkpoint is added to ZooKeeper and identified by its checkpoint ID.
    - On recovery, the latest checkpoint is recovered. If more than one checkpoint is available, we still only recover one in order to make sure that the history of checkpoints is consistent (currently we retain only 1 checkpoint anyways, but if we ever chose to retain more) in corner cases, where multiple job managers run the same job with checkpointing for some time.
    
    ### CheckpointIDCounter
    
    - `ZooKeeperCheckpointIDCounter` manages a shared counter in ZooKeeper (per job).
    - The `Checkpointed` interface requires ascending checkpoint IDs for each checkpoint.
    - We use a shared counter (per job) via a Curator recipe for this.
    
    ### Akka messages
    
    - This PR introduces two new JobManager message types:
      - RecoverAllJobs
      - RecoverJob(JobID)
    - The ZooKeeper operations are blocking and all JobManager actor calls needs to make sure to *not* block the JobManager. I've tried to cover all cases, where a ZooKeeper operation is triggered.
    - For tests, I didn't manage to stop the JobManager actor w/o running the `postStop` method. Because this method has some cleanup logic (removing job graphs and checkpoints), all JobManager recovery tests run the JobManager as a separate `JobManagerProcess`. This is quite heavy weight. If someone knows a way to stop the actor w/o the `postStop` being called, it would be great to refactor this.
    
    ## Next Steps
    
    - Behaviour on recovery via fixed delay is too simplistic.
    - Client is not fully integrated and submits jobs in detached mode if recovery mode is set to ZooKeeper.
    
    ## Tests
    
    There was a Travis/AWS outage yesterday and I couldn't run as many builds as we should yet. I would like to run a couple of runs before we merge this.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink recovery-2354

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1153
    
----
commit aa0be0a27b7077fcdb303d99db40a4fb85acf82a
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-03T13:13:28Z

    [runtime] Add type parameter to ByteStreamStateHandle

commit f37041bd705e71a3d7b2897e498fbbe625b38217
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-19T17:53:18Z

    [clients] Submit job detached if recovery enabled

commit 83523771621eb8446a365e769f7b525d6430bcbb
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-20T11:08:24Z

    [FLINK-2652] [tests] Temporary ignore flakey PartitionRequestClientFactoryTest

commit ad9b6572b73229ed92a6b3a0eee08d36a8e8bc6e
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-01T15:25:46Z

    [FLINK-2354] [runtime] Add job graph and checkpoint recovery

----


> Recover running jobs on JobManager failure
> ------------------------------------------
>
>                 Key: FLINK-2354
>                 URL: https://issues.apache.org/jira/browse/FLINK-2354
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the leading job manager looses all running jobs when it fails. After a new leading job manager is elected, it is not possible to recover any previously running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the job graph to a state backend, and 2) a reference to the respective state handle to ZooKeeper. In general, job graphs can become large (multiple MBs, because they include closures etc.). ZooKeeper is not designed for data of this size. The level of indirection via the reference to the state backend keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>        +- job id i
>             +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs between job managers. The currentJobs node needs to satisfy the following invariant: There is a reference to a job graph with id i IFF the respective job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if resubmitted). The next step is to backup the runtime state handles of checkpoints in a similar manner.
> ---
> This work will be based on [~trohrmann@apache.org]'s implementation of FLINK-2291. The leader election service notifies the job manager about granted/revoked leadership. This notification happens via Akka and thus serially *per* job manager, but results in eventually consistent state between job managers. For some snapshots of time it is possible to have a new leader granted leadership, before the old one has been revoked its leadership.
> [~trohrmann@apache.org], can you confirm that leadership does not guarantee mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to coordinate the access to currentJobs. The lock needs to be acquired on leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)