You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hao Sun <ha...@zendesk.com> on 2017/12/01 06:35:32 UTC

Re: Questions about checkpoints/savepoints

Hi team, I am a similar use case do we have any answers on this?
When we trigger savepoint can we store that information to ZK as well?
So I can avoid S3 file listing and do not have to use other external
services?

On Wed, Oct 25, 2017 at 11:19 PM vipul singh <ne...@gmail.com> wrote:

> As a followup to above, is there a way to get the last checkpoint metadata
> location inside *notifyCheckpointComplete*  method? I tried poking
> around, but didnt see a way to achieve this. Or incase there is any other
> way to save the actual checkpoint metadata location information into a
> datastore(dynamodb etc)?
>
> We are looking to save the savepoint/externalized checkpoint metadata
> location in some storage space, so that we can pass this information to
> flink run command during recovery(thereby removing the possibility of any
> read after write consistency arising out of listing file paths etc).
>
> Thanks,
> Vipul
>
> On Tue, Oct 24, 2017 at 11:53 PM, vipul singh <ne...@gmail.com> wrote:
>
>> Thanks Aljoscha for the explanations. I was able to recover from the
>> last externalized checkpoint, by using flink run -s <metadata file>
>> <options>
>>
>> I am curious, are there any options to save the metadata file name to
>> some other place like dynamo etc at the moment? The reason why I am asking
>> is,
>> for the end launcher code we are writing, we want to ensure if a flink
>> job crashes, we can just start it from last known externalized checkpoint.
>> In the present senario, we have to list the contents of the s3 bucket
>> which saves the metadata, to see the last metadata before failure, and
>> there might a window where
>> we might run into read after write consistency of s3. Thoughts?
>>
>> On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> That distinction with externalised checkpoints is a bit of a pitfall and
>>> I'm hoping that we can actually get rid of that distinction in the next
>>> version or the version after that. With that change, all checkpoints would
>>> always be externalised, since it's not really any noticeable overhead.
>>>
>>> Regarding read-after-write consistency, you should be fine since an the
>>> "externalised checkpoint", i.e. the metadata, is only one file. If you know
>>> the file-path (either from the Flink dashboard or by looking at the S3
>>> bucket) you can restore from it.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 24. Oct 2017, at 08:22, vipul singh <ne...@gmail.com> wrote:
>>>
>>> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
>>> and provide an s3 path, it uses externalized checkpoints by default. Thanks
>>> so much!
>>>
>>> I have one followup question. Say in above case, I terminate the
>>> cluster, and since the metadata is on s3, and not on local storage, does
>>> flink avoid read after write consistency of s3? Would it be a valid
>>> concern, or we handle that case in externalized checkpoints as well, and
>>> dont deal with file system operations while dealing with retrieving
>>> externalized checkpoints on s3.
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <to...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Did you enable externalized checkpoints? [1]
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints>
>>>>
>>>> 2017-10-24 13:07 GMT+08:00 vipul singh <ne...@gmail.com>:
>>>>
>>>>> Thanks Aljoscha for the answer above.
>>>>>
>>>>> I am experimenting with savepoints and checkpoints on my end, so that
>>>>> we built fault tolerant application with exactly once semantics.
>>>>>
>>>>> I have been able to test various scenarios, but have doubts about one
>>>>> use case.
>>>>>
>>>>> My app is running on an emr cluster, and I am trying to test the case
>>>>> when a emr cluster is terminated. I have read that
>>>>> *state.checkpoints.dir *is responsible for storing metadata
>>>>> information, and links to data files in
>>>>> *state.backend.fs.checkpointdir.*
>>>>>
>>>>> For my application I have configured both
>>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>>>>
>>>>> Also I have the following in my main app:
>>>>>
>>>>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>>>>
>>>>> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>>>>
>>>>> val backend:RocksDBStateBackend =
>>>>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>>>>
>>>>> env.setStateBackend(backend)
>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>>>>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>>>>
>>>>>
>>>>> In the application startup logs I can see
>>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>>>>> being loaded. However when the checkpoint happens I dont see any content in
>>>>> the metadata dir. Is there something I am missing? Please let me know. I am
>>>>> using flink version 1.3
>>>>>
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljoscha@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Flink does not rely on file system operations to list contents, all
>>>>>> necessary file paths are stored in the meta data file, as you guessed. This
>>>>>> is the reason savepoints also work with file systems that "only" have
>>>>>> read-after-write consistency.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
>>>>>>
>>>>>> Thanks Stefan for the answers above. These are really helpful.
>>>>>>
>>>>>> I have a few followup questions:
>>>>>>
>>>>>>    1. I see my savepoints are created in a folder, which has a
>>>>>>    _metadata file and another file. Looking at the code
>>>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> it seems like
>>>>>>    the metadata file contains tasks states, operator state and
>>>>>>    master states <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>>>>>    What is the purpose of the other file in the savepoint folder? My guess is
>>>>>>    it should be a checkpoint file?
>>>>>>    2. I am planning to use s3 as my state backend, so want to ensure
>>>>>>    that application restarts are not affected by read-after-write consistency
>>>>>>    of s3( if I use s3 as a savepoint backend). I am curious how flink restores
>>>>>>    data from the _metadata file, and the other file? Does the _metadata file
>>>>>>    contain path to these other files? or would it do a listing on the s3
>>>>>>    folder?
>>>>>>
>>>>>>
>>>>>> Please let me know,
>>>>>>
>>>>>> Thanks,
>>>>>> Vipul
>>>>>>
>>>>>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have answered your questions inline:
>>>>>>>
>>>>>>>
>>>>>>>    1. It seems to me that checkpoints can be treated as flink
>>>>>>>    internal recovery mechanism, and savepoints act more as user-defined
>>>>>>>    recovery points. Would that be a correct assumption?
>>>>>>>
>>>>>>> You could see it that way, but I would describe savepoints more as
>>>>>>> user-defined *restart* points than *recovery* points. Please take a look at
>>>>>>> my answers in this thread, because they cover most of your question:
>>>>>>>
>>>>>>>
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html> .
>>>>>>>
>>>>>>>
>>>>>>>    1. While cancelling an application with -s option, it specifies
>>>>>>>    the savepoint location. Is there a way during application startup to
>>>>>>>    identify the last know savepoint from a folder by itself, and restart from
>>>>>>>    there. Since I am saving my savepoints on s3, I want to avoid issues
>>>>>>>    arising from *ls* command on s3 due to read-after-write
>>>>>>>    consistency of s3.
>>>>>>>
>>>>>>> I don’t think that this feature exists, you have to specify the
>>>>>>> savepoint.
>>>>>>>
>>>>>>>
>>>>>>>    1. Suppose my application has a checkpoint at point t1, and say
>>>>>>>    i cancel this application sometime in future before the next available
>>>>>>>    checkpoint( say t1+x). If I start the application without specifying the
>>>>>>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>>>>>>    have the application state saved, since I had cancelled the application.
>>>>>>>    Would this is a correct assumption?
>>>>>>>
>>>>>>> If you restart a canceled application it will not consider
>>>>>>> checkpoints. They are only considered in recovery on failure. You need to
>>>>>>> specify a savepoint or externalized checkpoint for restarts to make
>>>>>>> explicit that you intend to restart a job, and not to run a new instance of
>>>>>>> the job.
>>>>>>>
>>>>>>>
>>>>>>>    1. Would using
>>>>>>>    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as
>>>>>>>    manually saving regular savepoints?
>>>>>>>
>>>>>>> Not the same, because checkpoints and savepoints are different in
>>>>>>> certain aspects, but both methods leave you with something that survives
>>>>>>> job cancelation and can be used to restart from a certain state.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Vipul
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Vipul
>>
>
>
>
> --
> Thanks,
> Vipul
>