You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ufuk Celebi (JIRA)" <ji...@apache.org> on 2017/05/15 13:54:04 UTC

[jira] [Closed] (FLINK-5777) Pass savepoint information to CheckpointingOperation

     [ https://issues.apache.org/jira/browse/FLINK-5777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ufuk Celebi closed FLINK-5777.
------------------------------
       Resolution: Fixed
    Fix Version/s: 1.3.0

Fixed in 6e7a91741708a2b167a2bbca5dda5b2059df5e18.

> Pass savepoint information to CheckpointingOperation
> ----------------------------------------------------
>
>                 Key: FLINK-5777
>                 URL: https://issues.apache.org/jira/browse/FLINK-5777
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 1.3.0
>
>
> In order to make savepoints self contained in a single directory, we need to pass some information to {{StreamTask#CheckpointingOperation}}.
> I propose to extend the {{CheckpointMetaData}} for this.
> We currently have some overlap with CheckpointMetaData, CheckpointMetrics, and manually passed checkpoint ID and checkpoint timestamps. We should restrict CheckpointMetaData to the integral meta data that needs to be passed to StreamTask#CheckpointingOperation.
> This means that we move the CheckpointMetrics out of the CheckpointMetaData and the BarrierBuffer/BarrierTracker create CheckpointMetrics separately and send it back with the acknowledge message.
> CheckpointMetaData should be extended with the following properties:
> - boolean isSavepoint
> - String targetDirectory
> There are two code paths that lead to the CheckpointingOperation:
> 1. From CheckpointCoordinator via RPC to StreamTask#triggerCheckpoint
> - Execution#triggerCheckpoint(long, long) 
> => triggerCheckpoint(CheckpointMetaData)
> - TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, long, long) 
> => TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, CheckpointMetaData)
> - Task#triggerCheckpointBarrier(long, long) =>  Task#triggerCheckpointBarrier(CheckpointMetaData)
> 2. From intermediate streams via the CheckpointBarrier to  StreamTask#triggerCheckpointOnBarrier
> - triggerCheckpointOnBarrier(CheckpointMetaData)
> => triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)
> - CheckpointBarrier(long, long) => CheckpointBarrier(CheckpointMetaData)
> - AcknowledgeCheckpoint(CheckpointMetaData)
> => AcknowledgeCheckpoint(long, CheckpointMetrics)
> The state backends provide another stream factory that is called in CheckpointingOperation when the meta data indicates savepoint. The state backends can choose whether they return the regular checkpoint stream factory in that case or a special one for savepoints. That way backends that don’t checkpoint to a file system can special case savepoints easily.
> - FsStateBackend: return special FsCheckpointStreamFactory with different directory layout
> - MemoryStateBackend: return regular checkpoint stream factory (MemCheckpointStreamFactory) => The _metadata file will contain all state as the state handles are part of it



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)