You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vipul singh <ne...@gmail.com> on 2017/10/10 01:01:56 UTC

Re: Questions about checkpoints/savepoints

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:

   1. I see my savepoints are created in a folder, which has a _metadata
   file and another file. Looking at the code
   <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
   it seems like the metadata file contains tasks states, operator state
   and master states
   <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
   What is the purpose of the other file in the savepoint folder? My guess is
   it should be a checkpoint file?
   2. I am planning to use s3 as my state backend, so want to ensure that
   application restarts are not affected by read-after-write consistency of
   s3( if I use s3 as a savepoint backend). I am curious how flink restores
   data from the _metadata file, and the other file? Does the _metadata file
   contain path to these other files? or would it do a listing on the s3
   folder?


Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> I have answered your questions inline:
>
>
>    1. It seems to me that checkpoints can be treated as flink internal
>    recovery mechanism, and savepoints act more as user-defined recovery
>    points. Would that be a correct assumption?
>
> You could see it that way, but I would describe savepoints more as
> user-defined *restart* points than *recovery* points. Please take a look at
> my answers in this thread, because they cover most of your question:
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>
>
>    1. While cancelling an application with -s option, it specifies the
>    savepoint location. Is there a way during application startup to identify
>    the last know savepoint from a folder by itself, and restart from there.
>    Since I am saving my savepoints on s3, I want to avoid issues arising from
>    *ls* command on s3 due to read-after-write consistency of s3.
>
> I don’t think that this feature exists, you have to specify the savepoint.
>
>
>    1. Suppose my application has a checkpoint at point t1, and say i
>    cancel this application sometime in future before the next available
>    checkpoint( say t1+x). If I start the application without specifying the
>    savepoint, it will start from the last known checkpoint(at t1), which wont
>    have the application state saved, since I had cancelled the application.
>    Would this is a correct assumption?
>
> If you restart a canceled application it will not consider checkpoints.
> They are only considered in recovery on failure. You need to specify a
> savepoint or externalized checkpoint for restarts to make explicit that you
> intend to restart a job, and not to run a new instance of the job.
>
>
>    1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
>    same as manually saving regular savepoints?
>
> Not the same, because checkpoints and savepoints are different in certain
> aspects, but both methods leave you with something that survives job
> cancelation and can be used to restart from a certain state.
>
> Best,
> Stefan
>
>


-- 
Thanks,
Vipul

Re: Questions about checkpoints/savepoints

Posted by Hao Sun <ha...@zendesk.com>.
Hi team, I am a similar use case do we have any answers on this?
When we trigger savepoint can we store that information to ZK as well?
So I can avoid S3 file listing and do not have to use other external
services?

On Wed, Oct 25, 2017 at 11:19 PM vipul singh <ne...@gmail.com> wrote:

> As a followup to above, is there a way to get the last checkpoint metadata
> location inside *notifyCheckpointComplete*  method? I tried poking
> around, but didnt see a way to achieve this. Or incase there is any other
> way to save the actual checkpoint metadata location information into a
> datastore(dynamodb etc)?
>
> We are looking to save the savepoint/externalized checkpoint metadata
> location in some storage space, so that we can pass this information to
> flink run command during recovery(thereby removing the possibility of any
> read after write consistency arising out of listing file paths etc).
>
> Thanks,
> Vipul
>
> On Tue, Oct 24, 2017 at 11:53 PM, vipul singh <ne...@gmail.com> wrote:
>
>> Thanks Aljoscha for the explanations. I was able to recover from the
>> last externalized checkpoint, by using flink run -s <metadata file>
>> <options>
>>
>> I am curious, are there any options to save the metadata file name to
>> some other place like dynamo etc at the moment? The reason why I am asking
>> is,
>> for the end launcher code we are writing, we want to ensure if a flink
>> job crashes, we can just start it from last known externalized checkpoint.
>> In the present senario, we have to list the contents of the s3 bucket
>> which saves the metadata, to see the last metadata before failure, and
>> there might a window where
>> we might run into read after write consistency of s3. Thoughts?
>>
>> On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> That distinction with externalised checkpoints is a bit of a pitfall and
>>> I'm hoping that we can actually get rid of that distinction in the next
>>> version or the version after that. With that change, all checkpoints would
>>> always be externalised, since it's not really any noticeable overhead.
>>>
>>> Regarding read-after-write consistency, you should be fine since an the
>>> "externalised checkpoint", i.e. the metadata, is only one file. If you know
>>> the file-path (either from the Flink dashboard or by looking at the S3
>>> bucket) you can restore from it.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 24. Oct 2017, at 08:22, vipul singh <ne...@gmail.com> wrote:
>>>
>>> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
>>> and provide an s3 path, it uses externalized checkpoints by default. Thanks
>>> so much!
>>>
>>> I have one followup question. Say in above case, I terminate the
>>> cluster, and since the metadata is on s3, and not on local storage, does
>>> flink avoid read after write consistency of s3? Would it be a valid
>>> concern, or we handle that case in externalized checkpoints as well, and
>>> dont deal with file system operations while dealing with retrieving
>>> externalized checkpoints on s3.
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <to...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Did you enable externalized checkpoints? [1]
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints>
>>>>
>>>> 2017-10-24 13:07 GMT+08:00 vipul singh <ne...@gmail.com>:
>>>>
>>>>> Thanks Aljoscha for the answer above.
>>>>>
>>>>> I am experimenting with savepoints and checkpoints on my end, so that
>>>>> we built fault tolerant application with exactly once semantics.
>>>>>
>>>>> I have been able to test various scenarios, but have doubts about one
>>>>> use case.
>>>>>
>>>>> My app is running on an emr cluster, and I am trying to test the case
>>>>> when a emr cluster is terminated. I have read that
>>>>> *state.checkpoints.dir *is responsible for storing metadata
>>>>> information, and links to data files in
>>>>> *state.backend.fs.checkpointdir.*
>>>>>
>>>>> For my application I have configured both
>>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>>>>
>>>>> Also I have the following in my main app:
>>>>>
>>>>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>>>>
>>>>> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>>>>
>>>>> val backend:RocksDBStateBackend =
>>>>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>>>>
>>>>> env.setStateBackend(backend)
>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>>>>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>>>>
>>>>>
>>>>> In the application startup logs I can see
>>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>>>>> being loaded. However when the checkpoint happens I dont see any content in
>>>>> the metadata dir. Is there something I am missing? Please let me know. I am
>>>>> using flink version 1.3
>>>>>
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljoscha@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Flink does not rely on file system operations to list contents, all
>>>>>> necessary file paths are stored in the meta data file, as you guessed. This
>>>>>> is the reason savepoints also work with file systems that "only" have
>>>>>> read-after-write consistency.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
>>>>>>
>>>>>> Thanks Stefan for the answers above. These are really helpful.
>>>>>>
>>>>>> I have a few followup questions:
>>>>>>
>>>>>>    1. I see my savepoints are created in a folder, which has a
>>>>>>    _metadata file and another file. Looking at the code
>>>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> it seems like
>>>>>>    the metadata file contains tasks states, operator state and
>>>>>>    master states <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>>>>>    What is the purpose of the other file in the savepoint folder? My guess is
>>>>>>    it should be a checkpoint file?
>>>>>>    2. I am planning to use s3 as my state backend, so want to ensure
>>>>>>    that application restarts are not affected by read-after-write consistency
>>>>>>    of s3( if I use s3 as a savepoint backend). I am curious how flink restores
>>>>>>    data from the _metadata file, and the other file? Does the _metadata file
>>>>>>    contain path to these other files? or would it do a listing on the s3
>>>>>>    folder?
>>>>>>
>>>>>>
>>>>>> Please let me know,
>>>>>>
>>>>>> Thanks,
>>>>>> Vipul
>>>>>>
>>>>>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have answered your questions inline:
>>>>>>>
>>>>>>>
>>>>>>>    1. It seems to me that checkpoints can be treated as flink
>>>>>>>    internal recovery mechanism, and savepoints act more as user-defined
>>>>>>>    recovery points. Would that be a correct assumption?
>>>>>>>
>>>>>>> You could see it that way, but I would describe savepoints more as
>>>>>>> user-defined *restart* points than *recovery* points. Please take a look at
>>>>>>> my answers in this thread, because they cover most of your question:
>>>>>>>
>>>>>>>
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html> .
>>>>>>>
>>>>>>>
>>>>>>>    1. While cancelling an application with -s option, it specifies
>>>>>>>    the savepoint location. Is there a way during application startup to
>>>>>>>    identify the last know savepoint from a folder by itself, and restart from
>>>>>>>    there. Since I am saving my savepoints on s3, I want to avoid issues
>>>>>>>    arising from *ls* command on s3 due to read-after-write
>>>>>>>    consistency of s3.
>>>>>>>
>>>>>>> I don’t think that this feature exists, you have to specify the
>>>>>>> savepoint.
>>>>>>>
>>>>>>>
>>>>>>>    1. Suppose my application has a checkpoint at point t1, and say
>>>>>>>    i cancel this application sometime in future before the next available
>>>>>>>    checkpoint( say t1+x). If I start the application without specifying the
>>>>>>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>>>>>>    have the application state saved, since I had cancelled the application.
>>>>>>>    Would this is a correct assumption?
>>>>>>>
>>>>>>> If you restart a canceled application it will not consider
>>>>>>> checkpoints. They are only considered in recovery on failure. You need to
>>>>>>> specify a savepoint or externalized checkpoint for restarts to make
>>>>>>> explicit that you intend to restart a job, and not to run a new instance of
>>>>>>> the job.
>>>>>>>
>>>>>>>
>>>>>>>    1. Would using
>>>>>>>    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as
>>>>>>>    manually saving regular savepoints?
>>>>>>>
>>>>>>> Not the same, because checkpoints and savepoints are different in
>>>>>>> certain aspects, but both methods leave you with something that survives
>>>>>>> job cancelation and can be used to restart from a certain state.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Vipul
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Vipul
>>
>
>
>
> --
> Thanks,
> Vipul
>

Re: Questions about checkpoints/savepoints

Posted by vipul singh <ne...@gmail.com>.
As a followup to above, is there a way to get the last checkpoint metadata
location inside *notifyCheckpointComplete*  method? I tried poking around,
but didnt see a way to achieve this. Or incase there is any other way to
save the actual checkpoint metadata location information into a
datastore(dynamodb etc)?

We are looking to save the savepoint/externalized checkpoint metadata
location in some storage space, so that we can pass this information to
flink run command during recovery(thereby removing the possibility of any
read after write consistency arising out of listing file paths etc).

Thanks,
Vipul

On Tue, Oct 24, 2017 at 11:53 PM, vipul singh <ne...@gmail.com> wrote:

> Thanks Aljoscha for the explanations. I was able to recover from the last
> externalized checkpoint, by using flink run -s <metadata file> <options>
>
> I am curious, are there any options to save the metadata file name to some
> other place like dynamo etc at the moment? The reason why I am asking is,
> for the end launcher code we are writing, we want to ensure if a flink job
> crashes, we can just start it from last known externalized checkpoint.
> In the present senario, we have to list the contents of the s3 bucket
> which saves the metadata, to see the last metadata before failure, and
> there might a window where
> we might run into read after write consistency of s3. Thoughts?
>
> On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> That distinction with externalised checkpoints is a bit of a pitfall and
>> I'm hoping that we can actually get rid of that distinction in the next
>> version or the version after that. With that change, all checkpoints would
>> always be externalised, since it's not really any noticeable overhead.
>>
>> Regarding read-after-write consistency, you should be fine since an the
>> "externalised checkpoint", i.e. the metadata, is only one file. If you know
>> the file-path (either from the Flink dashboard or by looking at the S3
>> bucket) you can restore from it.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 24. Oct 2017, at 08:22, vipul singh <ne...@gmail.com> wrote:
>>
>> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
>> and provide an s3 path, it uses externalized checkpoints by default. Thanks
>> so much!
>>
>> I have one followup question. Say in above case, I terminate the cluster,
>> and since the metadata is on s3, and not on local storage, does flink avoid
>> read after write consistency of s3? Would it be a valid concern, or we
>> handle that case in externalized checkpoints as well, and dont deal with
>> file system operations while dealing with retrieving externalized
>> checkpoints on s3.
>>
>> Thanks,
>> Vipul
>>
>>
>>
>> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <to...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Did you enable externalized checkpoints? [1]
>>>
>>> Best,
>>> Tony Wei
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>>> 1.3/setup/checkpoints.html#externalized-checkpoints
>>>
>>> 2017-10-24 13:07 GMT+08:00 vipul singh <ne...@gmail.com>:
>>>
>>>> Thanks Aljoscha for the answer above.
>>>>
>>>> I am experimenting with savepoints and checkpoints on my end, so that
>>>> we built fault tolerant application with exactly once semantics.
>>>>
>>>> I have been able to test various scenarios, but have doubts about one
>>>> use case.
>>>>
>>>> My app is running on an emr cluster, and I am trying to test the case
>>>> when a emr cluster is terminated. I have read that
>>>> *state.checkpoints.dir *is responsible for storing metadata
>>>> information, and links to data files in
>>>> *state.backend.fs.checkpointdir.*
>>>>
>>>> For my application I have configured both
>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>>>
>>>> Also I have the following in my main app:
>>>>
>>>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>>>
>>>> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>>>
>>>> val backend:RocksDBStateBackend =
>>>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>>>
>>>> env.setStateBackend(backend)
>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>>>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>>>
>>>>
>>>> In the application startup logs I can see
>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>>>> being loaded. However when the checkpoint happens I dont see any content in
>>>> the metadata dir. Is there something I am missing? Please let me know. I am
>>>> using flink version 1.3
>>>>
>>>> Thanks,
>>>> Vipul
>>>>
>>>>
>>>>
>>>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink does not rely on file system operations to list contents, all
>>>>> necessary file paths are stored in the meta data file, as you guessed. This
>>>>> is the reason savepoints also work with file systems that "only" have
>>>>> read-after-write consistency.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
>>>>>
>>>>> Thanks Stefan for the answers above. These are really helpful.
>>>>>
>>>>> I have a few followup questions:
>>>>>
>>>>>    1. I see my savepoints are created in a folder, which has a
>>>>>    _metadata file and another file. Looking at the code
>>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>>>>>    it seems like the metadata file contains tasks states, operator
>>>>>    state and master states
>>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>>>>    What is the purpose of the other file in the savepoint folder? My guess is
>>>>>    it should be a checkpoint file?
>>>>>    2. I am planning to use s3 as my state backend, so want to ensure
>>>>>    that application restarts are not affected by read-after-write consistency
>>>>>    of s3( if I use s3 as a savepoint backend). I am curious how flink restores
>>>>>    data from the _metadata file, and the other file? Does the _metadata file
>>>>>    contain path to these other files? or would it do a listing on the s3
>>>>>    folder?
>>>>>
>>>>>
>>>>> Please let me know,
>>>>>
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>>>>> s.richter@data-artisans.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have answered your questions inline:
>>>>>>
>>>>>>
>>>>>>    1. It seems to me that checkpoints can be treated as flink
>>>>>>    internal recovery mechanism, and savepoints act more as user-defined
>>>>>>    recovery points. Would that be a correct assumption?
>>>>>>
>>>>>> You could see it that way, but I would describe savepoints more as
>>>>>> user-defined *restart* points than *recovery* points. Please take a look at
>>>>>> my answers in this thread, because they cover most of your question:
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>>>>>
>>>>>>
>>>>>>    1. While cancelling an application with -s option, it specifies
>>>>>>    the savepoint location. Is there a way during application startup to
>>>>>>    identify the last know savepoint from a folder by itself, and restart from
>>>>>>    there. Since I am saving my savepoints on s3, I want to avoid issues
>>>>>>    arising from *ls* command on s3 due to read-after-write
>>>>>>    consistency of s3.
>>>>>>
>>>>>> I don’t think that this feature exists, you have to specify the
>>>>>> savepoint.
>>>>>>
>>>>>>
>>>>>>    1. Suppose my application has a checkpoint at point t1, and say i
>>>>>>    cancel this application sometime in future before the next available
>>>>>>    checkpoint( say t1+x). If I start the application without specifying the
>>>>>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>>>>>    have the application state saved, since I had cancelled the application.
>>>>>>    Would this is a correct assumption?
>>>>>>
>>>>>> If you restart a canceled application it will not consider
>>>>>> checkpoints. They are only considered in recovery on failure. You need to
>>>>>> specify a savepoint or externalized checkpoint for restarts to make
>>>>>> explicit that you intend to restart a job, and not to run a new instance of
>>>>>> the job.
>>>>>>
>>>>>>
>>>>>>    1. Would using ExternalizedCheckpointCl
>>>>>>    eanup.RETAIN_ON_CANCELLATION be same as manually saving regular
>>>>>>    savepoints?
>>>>>>
>>>>>> Not the same, because checkpoints and savepoints are different in
>>>>>> certain aspects, but both methods leave you with something that survives
>>>>>> job cancelation and can be used to restart from a certain state.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Vipul
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Vipul
>>
>>
>>
>
>
> --
> Thanks,
> Vipul
>



-- 
Thanks,
Vipul

Re: Questions about checkpoints/savepoints

Posted by vipul singh <ne...@gmail.com>.
Thanks Aljoscha for the explanations. I was able to recover from the last
externalized checkpoint, by using flink run -s <metadata file> <options>

I am curious, are there any options to save the metadata file name to some
other place like dynamo etc at the moment? The reason why I am asking is,
for the end launcher code we are writing, we want to ensure if a flink job
crashes, we can just start it from last known externalized checkpoint.
In the present senario, we have to list the contents of the s3 bucket which
saves the metadata, to see the last metadata before failure, and there
might a window where
we might run into read after write consistency of s3. Thoughts?

On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> That distinction with externalised checkpoints is a bit of a pitfall and
> I'm hoping that we can actually get rid of that distinction in the next
> version or the version after that. With that change, all checkpoints would
> always be externalised, since it's not really any noticeable overhead.
>
> Regarding read-after-write consistency, you should be fine since an the
> "externalised checkpoint", i.e. the metadata, is only one file. If you know
> the file-path (either from the Flink dashboard or by looking at the S3
> bucket) you can restore from it.
>
> Best,
> Aljoscha
>
>
> On 24. Oct 2017, at 08:22, vipul singh <ne...@gmail.com> wrote:
>
> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
> and provide an s3 path, it uses externalized checkpoints by default. Thanks
> so much!
>
> I have one followup question. Say in above case, I terminate the cluster,
> and since the metadata is on s3, and not on local storage, does flink avoid
> read after write consistency of s3? Would it be a valid concern, or we
> handle that case in externalized checkpoints as well, and dont deal with
> file system operations while dealing with retrieving externalized
> checkpoints on s3.
>
> Thanks,
> Vipul
>
>
>
> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <to...@gmail.com> wrote:
>
>> Hi,
>>
>> Did you enable externalized checkpoints? [1]
>>
>> Best,
>> Tony Wei
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/setup/checkpoints.html#externalized-checkpoints
>>
>> 2017-10-24 13:07 GMT+08:00 vipul singh <ne...@gmail.com>:
>>
>>> Thanks Aljoscha for the answer above.
>>>
>>> I am experimenting with savepoints and checkpoints on my end, so that we
>>> built fault tolerant application with exactly once semantics.
>>>
>>> I have been able to test various scenarios, but have doubts about one
>>> use case.
>>>
>>> My app is running on an emr cluster, and I am trying to test the case
>>> when a emr cluster is terminated. I have read that
>>> *state.checkpoints.dir *is responsible for storing metadata
>>> information, and links to data files in
>>> *state.backend.fs.checkpointdir.*
>>>
>>> For my application I have configured both
>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>>
>>> Also I have the following in my main app:
>>>
>>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>>
>>> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>>
>>> val backend:RocksDBStateBackend =
>>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>>
>>> env.setStateBackend(backend)
>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>>
>>>
>>> In the application startup logs I can see
>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>>> being loaded. However when the checkpoint happens I dont see any content in
>>> the metadata dir. Is there something I am missing? Please let me know. I am
>>> using flink version 1.3
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink does not rely on file system operations to list contents, all
>>>> necessary file paths are stored in the meta data file, as you guessed. This
>>>> is the reason savepoints also work with file systems that "only" have
>>>> read-after-write consistency.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
>>>>
>>>> Thanks Stefan for the answers above. These are really helpful.
>>>>
>>>> I have a few followup questions:
>>>>
>>>>    1. I see my savepoints are created in a folder, which has a
>>>>    _metadata file and another file. Looking at the code
>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>>>>    it seems like the metadata file contains tasks states, operator
>>>>    state and master states
>>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>>>    What is the purpose of the other file in the savepoint folder? My guess is
>>>>    it should be a checkpoint file?
>>>>    2. I am planning to use s3 as my state backend, so want to ensure
>>>>    that application restarts are not affected by read-after-write consistency
>>>>    of s3( if I use s3 as a savepoint backend). I am curious how flink restores
>>>>    data from the _metadata file, and the other file? Does the _metadata file
>>>>    contain path to these other files? or would it do a listing on the s3
>>>>    folder?
>>>>
>>>>
>>>> Please let me know,
>>>>
>>>> Thanks,
>>>> Vipul
>>>>
>>>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>>>> s.richter@data-artisans.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have answered your questions inline:
>>>>>
>>>>>
>>>>>    1. It seems to me that checkpoints can be treated as flink
>>>>>    internal recovery mechanism, and savepoints act more as user-defined
>>>>>    recovery points. Would that be a correct assumption?
>>>>>
>>>>> You could see it that way, but I would describe savepoints more as
>>>>> user-defined *restart* points than *recovery* points. Please take a look at
>>>>> my answers in this thread, because they cover most of your question:
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>>>>
>>>>>
>>>>>    1. While cancelling an application with -s option, it specifies
>>>>>    the savepoint location. Is there a way during application startup to
>>>>>    identify the last know savepoint from a folder by itself, and restart from
>>>>>    there. Since I am saving my savepoints on s3, I want to avoid issues
>>>>>    arising from *ls* command on s3 due to read-after-write
>>>>>    consistency of s3.
>>>>>
>>>>> I don’t think that this feature exists, you have to specify the
>>>>> savepoint.
>>>>>
>>>>>
>>>>>    1. Suppose my application has a checkpoint at point t1, and say i
>>>>>    cancel this application sometime in future before the next available
>>>>>    checkpoint( say t1+x). If I start the application without specifying the
>>>>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>>>>    have the application state saved, since I had cancelled the application.
>>>>>    Would this is a correct assumption?
>>>>>
>>>>> If you restart a canceled application it will not consider
>>>>> checkpoints. They are only considered in recovery on failure. You need to
>>>>> specify a savepoint or externalized checkpoint for restarts to make
>>>>> explicit that you intend to restart a job, and not to run a new instance of
>>>>> the job.
>>>>>
>>>>>
>>>>>    1. Would using ExternalizedCheckpointCl
>>>>>    eanup.RETAIN_ON_CANCELLATION be same as manually saving regular
>>>>>    savepoints?
>>>>>
>>>>> Not the same, because checkpoints and savepoints are different in
>>>>> certain aspects, but both methods leave you with something that survives
>>>>> job cancelation and can be used to restart from a certain state.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Vipul
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Vipul
>>>
>>
>>
>
>
> --
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul

Re: Questions about checkpoints/savepoints

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

That distinction with externalised checkpoints is a bit of a pitfall and I'm hoping that we can actually get rid of that distinction in the next version or the version after that. With that change, all checkpoints would always be externalised, since it's not really any noticeable overhead.

Regarding read-after-write consistency, you should be fine since an the "externalised checkpoint", i.e. the metadata, is only one file. If you know the file-path (either from the Flink dashboard or by looking at the S3 bucket) you can restore from it.

Best,
Aljoscha

> On 24. Oct 2017, at 08:22, vipul singh <ne...@gmail.com> wrote:
> 
> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much!
> 
> I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. 
> 
> Thanks,
> Vipul
> 
> 
> 
> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920430@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> Did you enable externalized checkpoints? [1]
> 
> Best,
> Tony Wei
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints>
> 
> 2017-10-24 13:07 GMT+08:00 vipul singh <neoeahit@gmail.com <ma...@gmail.com>>:
> Thanks Aljoscha for the answer above.
> 
> I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.
> 
> I have been able to test various scenarios, but have doubts about one use case.
> 
> My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.
> 
> For my application I have configured both
> state.backend.fs.checkpointdir and state.checkpoints.dir
> 
> Also I have the following in my main app:
> env.enableCheckpointing(CHECKPOINT_TIME_MS)
> 
> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
> 
> val backend:RocksDBStateBackend =
>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
> 
> env.setStateBackend(backend)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
> 
> In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3
> 
> Thanks,
> Vipul
> 
> 
> 
> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.
> 
> Best,
> Aljoscha
> 
> 
>> On 10. Oct 2017, at 03:01, vipul singh <neoeahit@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Thanks Stefan for the answers above. These are really helpful.
>> 
>> I have a few followup questions:
>> I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> it seems like the metadata file contains tasks states, operator state and master states <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
>> I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?
>> 
>> Please let me know,
>> 
>> Thanks,
>> Vipul
>> 
>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I have answered your questions inline:
>>> It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
>> 
>> You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:
>> 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html> .
>> 
>>> While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
>> 
>> I don’t think that this feature exists, you have to specify the savepoint.
>> 
>>> Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
>> 
>> If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.
>> 
>>> Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
>> Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.
>> 
>> Best,
>> Stefan
>> 
>> 
>> 
>> 
>> -- 
>> Thanks,
>> Vipul
> 
> 
> 
> 
> -- 
> Thanks,
> Vipul
> 
> 
> 
> 
> -- 
> Thanks,
> Vipul


Re: Questions about checkpoints/savepoints

Posted by vipul singh <ne...@gmail.com>.
Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
and provide an s3 path, it uses externalized checkpoints by default. Thanks
so much!

I have one followup question. Say in above case, I terminate the cluster,
and since the metadata is on s3, and not on local storage, does flink avoid
read after write consistency of s3? Would it be a valid concern, or we
handle that case in externalized checkpoints as well, and dont deal with
file system operations while dealing with retrieving externalized
checkpoints on s3.

Thanks,
Vipul



On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <to...@gmail.com> wrote:

> Hi,
>
> Did you enable externalized checkpoints? [1]
>
> Best,
> Tony Wei
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/checkpoints.html#externalized-checkpoints
>
> 2017-10-24 13:07 GMT+08:00 vipul singh <ne...@gmail.com>:
>
>> Thanks Aljoscha for the answer above.
>>
>> I am experimenting with savepoints and checkpoints on my end, so that we
>> built fault tolerant application with exactly once semantics.
>>
>> I have been able to test various scenarios, but have doubts about one use
>> case.
>>
>> My app is running on an emr cluster, and I am trying to test the case
>> when a emr cluster is terminated. I have read that
>> *state.checkpoints.dir *is responsible for storing metadata information,
>> and links to data files in *state.backend.fs.checkpointdir.*
>>
>> For my application I have configured both
>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>
>> Also I have the following in my main app:
>>
>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>
>> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>
>> val backend:RocksDBStateBackend =
>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>
>> env.setStateBackend(backend)
>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>
>>
>> In the application startup logs I can see
>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>> being loaded. However when the checkpoint happens I dont see any content in
>> the metadata dir. Is there something I am missing? Please let me know. I am
>> using flink version 1.3
>>
>> Thanks,
>> Vipul
>>
>>
>>
>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Flink does not rely on file system operations to list contents, all
>>> necessary file paths are stored in the meta data file, as you guessed. This
>>> is the reason savepoints also work with file systems that "only" have
>>> read-after-write consistency.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
>>>
>>> Thanks Stefan for the answers above. These are really helpful.
>>>
>>> I have a few followup questions:
>>>
>>>    1. I see my savepoints are created in a folder, which has a
>>>    _metadata file and another file. Looking at the code
>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>>>    it seems like the metadata file contains tasks states, operator
>>>    state and master states
>>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>>    What is the purpose of the other file in the savepoint folder? My guess is
>>>    it should be a checkpoint file?
>>>    2. I am planning to use s3 as my state backend, so want to ensure
>>>    that application restarts are not affected by read-after-write consistency
>>>    of s3( if I use s3 as a savepoint backend). I am curious how flink restores
>>>    data from the _metadata file, and the other file? Does the _metadata file
>>>    contain path to these other files? or would it do a listing on the s3
>>>    folder?
>>>
>>>
>>> Please let me know,
>>>
>>> Thanks,
>>> Vipul
>>>
>>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>>> s.richter@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have answered your questions inline:
>>>>
>>>>
>>>>    1. It seems to me that checkpoints can be treated as flink internal
>>>>    recovery mechanism, and savepoints act more as user-defined recovery
>>>>    points. Would that be a correct assumption?
>>>>
>>>> You could see it that way, but I would describe savepoints more as
>>>> user-defined *restart* points than *recovery* points. Please take a look at
>>>> my answers in this thread, because they cover most of your question:
>>>>
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>>>
>>>>
>>>>    1. While cancelling an application with -s option, it specifies the
>>>>    savepoint location. Is there a way during application startup to identify
>>>>    the last know savepoint from a folder by itself, and restart from there.
>>>>    Since I am saving my savepoints on s3, I want to avoid issues arising from
>>>>    *ls* command on s3 due to read-after-write consistency of s3.
>>>>
>>>> I don’t think that this feature exists, you have to specify the
>>>> savepoint.
>>>>
>>>>
>>>>    1. Suppose my application has a checkpoint at point t1, and say i
>>>>    cancel this application sometime in future before the next available
>>>>    checkpoint( say t1+x). If I start the application without specifying the
>>>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>>>    have the application state saved, since I had cancelled the application.
>>>>    Would this is a correct assumption?
>>>>
>>>> If you restart a canceled application it will not consider checkpoints.
>>>> They are only considered in recovery on failure. You need to specify a
>>>> savepoint or externalized checkpoint for restarts to make explicit that you
>>>> intend to restart a job, and not to run a new instance of the job.
>>>>
>>>>
>>>>    1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>>>    be same as manually saving regular savepoints?
>>>>
>>>> Not the same, because checkpoints and savepoints are different in
>>>> certain aspects, but both methods leave you with something that survives
>>>> job cancelation and can be used to restart from a certain state.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Vipul
>>
>
>


-- 
Thanks,
Vipul

Re: Questions about checkpoints/savepoints

Posted by Tony Wei <to...@gmail.com>.
Hi,

Did you enable externalized checkpoints? [1]

Best,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints

2017-10-24 13:07 GMT+08:00 vipul singh <ne...@gmail.com>:

> Thanks Aljoscha for the answer above.
>
> I am experimenting with savepoints and checkpoints on my end, so that we
> built fault tolerant application with exactly once semantics.
>
> I have been able to test various scenarios, but have doubts about one use
> case.
>
> My app is running on an emr cluster, and I am trying to test the case when
> a emr cluster is terminated. I have read that *state.checkpoints.dir *is
> responsible for storing metadata information, and links to data files in
> *state.backend.fs.checkpointdir.*
>
> For my application I have configured both
> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>
> Also I have the following in my main app:
>
> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>
> val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>
> val backend:RocksDBStateBackend =
>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>
> env.setStateBackend(backend)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>
>
> In the application startup logs I can see *state.backend.fs.checkpointdir*
> and *state.checkpoints.dir, *values being loaded. However when the
> checkpoint happens I dont see any content in the metadata dir. Is there
> something I am missing? Please let me know. I am using flink version 1.3
>
> Thanks,
> Vipul
>
>
>
> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> Flink does not rely on file system operations to list contents, all
>> necessary file paths are stored in the meta data file, as you guessed. This
>> is the reason savepoints also work with file systems that "only" have
>> read-after-write consistency.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
>>
>> Thanks Stefan for the answers above. These are really helpful.
>>
>> I have a few followup questions:
>>
>>    1. I see my savepoints are created in a folder, which has a _metadata
>>    file and another file. Looking at the code
>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>>    it seems like the metadata file contains tasks states, operator state
>>    and master states
>>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>    What is the purpose of the other file in the savepoint folder? My guess is
>>    it should be a checkpoint file?
>>    2. I am planning to use s3 as my state backend, so want to ensure
>>    that application restarts are not affected by read-after-write consistency
>>    of s3( if I use s3 as a savepoint backend). I am curious how flink restores
>>    data from the _metadata file, and the other file? Does the _metadata file
>>    contain path to these other files? or would it do a listing on the s3
>>    folder?
>>
>>
>> Please let me know,
>>
>> Thanks,
>> Vipul
>>
>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I have answered your questions inline:
>>>
>>>
>>>    1. It seems to me that checkpoints can be treated as flink internal
>>>    recovery mechanism, and savepoints act more as user-defined recovery
>>>    points. Would that be a correct assumption?
>>>
>>> You could see it that way, but I would describe savepoints more as
>>> user-defined *restart* points than *recovery* points. Please take a look at
>>> my answers in this thread, because they cover most of your question:
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>>
>>>
>>>    1. While cancelling an application with -s option, it specifies the
>>>    savepoint location. Is there a way during application startup to identify
>>>    the last know savepoint from a folder by itself, and restart from there.
>>>    Since I am saving my savepoints on s3, I want to avoid issues arising from
>>>    *ls* command on s3 due to read-after-write consistency of s3.
>>>
>>> I don’t think that this feature exists, you have to specify the
>>> savepoint.
>>>
>>>
>>>    1. Suppose my application has a checkpoint at point t1, and say i
>>>    cancel this application sometime in future before the next available
>>>    checkpoint( say t1+x). If I start the application without specifying the
>>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>>    have the application state saved, since I had cancelled the application.
>>>    Would this is a correct assumption?
>>>
>>> If you restart a canceled application it will not consider checkpoints.
>>> They are only considered in recovery on failure. You need to specify a
>>> savepoint or externalized checkpoint for restarts to make explicit that you
>>> intend to restart a job, and not to run a new instance of the job.
>>>
>>>
>>>    1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
>>>    same as manually saving regular savepoints?
>>>
>>> Not the same, because checkpoints and savepoints are different in
>>> certain aspects, but both methods leave you with something that survives
>>> job cancelation and can be used to restart from a certain state.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Vipul
>>
>>
>>
>
>
> --
> Thanks,
> Vipul
>

Re: Questions about checkpoints/savepoints

Posted by vipul singh <ne...@gmail.com>.
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we
built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use
case.

My app is running on an emr cluster, and I am trying to test the case when
a emr cluster is terminated. I have read that *state.checkpoints.dir *is
responsible for storing metadata information, and links to data files in
*state.backend.fs.checkpointdir.*

For my application I have configured both
*state.backend.fs.checkpointdir* and *state.checkpoints.dir*

Also I have the following in my main app:

env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION =
s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
  new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)


In the application startup logs I can see *state.backend.fs.checkpointdir*
and *state.checkpoints.dir, *values being loaded. However when the
checkpoint happens I dont see any content in the metadata dir. Is there
something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> Flink does not rely on file system operations to list contents, all
> necessary file paths are stored in the meta data file, as you guessed. This
> is the reason savepoints also work with file systems that "only" have
> read-after-write consistency.
>
> Best,
> Aljoscha
>
>
> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
>
> Thanks Stefan for the answers above. These are really helpful.
>
> I have a few followup questions:
>
>    1. I see my savepoints are created in a folder, which has a _metadata
>    file and another file. Looking at the code
>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>    it seems like the metadata file contains tasks states, operator state
>    and master states
>    <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>    What is the purpose of the other file in the savepoint folder? My guess is
>    it should be a checkpoint file?
>    2. I am planning to use s3 as my state backend, so want to ensure that
>    application restarts are not affected by read-after-write consistency of
>    s3( if I use s3 as a savepoint backend). I am curious how flink restores
>    data from the _metadata file, and the other file? Does the _metadata file
>    contain path to these other files? or would it do a listing on the s3
>    folder?
>
>
> Please let me know,
>
> Thanks,
> Vipul
>
> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> I have answered your questions inline:
>>
>>
>>    1. It seems to me that checkpoints can be treated as flink internal
>>    recovery mechanism, and savepoints act more as user-defined recovery
>>    points. Would that be a correct assumption?
>>
>> You could see it that way, but I would describe savepoints more as
>> user-defined *restart* points than *recovery* points. Please take a look at
>> my answers in this thread, because they cover most of your question:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>
>>
>>    1. While cancelling an application with -s option, it specifies the
>>    savepoint location. Is there a way during application startup to identify
>>    the last know savepoint from a folder by itself, and restart from there.
>>    Since I am saving my savepoints on s3, I want to avoid issues arising from
>>    *ls* command on s3 due to read-after-write consistency of s3.
>>
>> I don’t think that this feature exists, you have to specify the savepoint.
>>
>>
>>    1. Suppose my application has a checkpoint at point t1, and say i
>>    cancel this application sometime in future before the next available
>>    checkpoint( say t1+x). If I start the application without specifying the
>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>    have the application state saved, since I had cancelled the application.
>>    Would this is a correct assumption?
>>
>> If you restart a canceled application it will not consider checkpoints.
>> They are only considered in recovery on failure. You need to specify a
>> savepoint or externalized checkpoint for restarts to make explicit that you
>> intend to restart a job, and not to run a new instance of the job.
>>
>>
>>    1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
>>    same as manually saving regular savepoints?
>>
>> Not the same, because checkpoints and savepoints are different in certain
>> aspects, but both methods leave you with something that survives job
>> cancelation and can be used to restart from a certain state.
>>
>> Best,
>> Stefan
>>
>>
>
>
> --
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul

Re: Questions about checkpoints/savepoints

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha

> On 10. Oct 2017, at 03:01, vipul singh <ne...@gmail.com> wrote:
> 
> Thanks Stefan for the answers above. These are really helpful.
> 
> I have a few followup questions:
> I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> it seems like the metadata file contains tasks states, operator state and master states <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
> I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?
> 
> Please let me know,
> 
> Thanks,
> Vipul
> 
> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> I have answered your questions inline:
>> It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
> 
> You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html> .
> 
>> While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
> 
> I don’t think that this feature exists, you have to specify the savepoint.
> 
>> Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
> 
> If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.
> 
>> Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
> Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.
> 
> Best,
> Stefan
> 
> 
> 
> 
> -- 
> Thanks,
> Vipul