You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aftab Ansari <af...@rovio.com> on 2017/07/10 12:20:53 UTC

data loss after implementing checkpoint

Hi,
I am new to flink. I am facing issue implementing checkpoint.

checkpoint related code:

long checkpointInterval = 5000;

 StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
 //specify backend
 //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
env.setStateBackend(new
FsStateBackend("file:///Users/aftabansari/flink-state/", true));
 //enable checkpoint
 env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);


When I run the code, I can see flink-state being written in my local
machine. but when I stop the job , wait for a few minutes and restart the
job, it does not pick up from the time it left but it starts from when I
started the job.

Could you point out what i am doing wrong. I am testing it locally from
ideaIntellij. below is what i see from localhost. Any help would be
appreciated. Thanks
[image: Inline images 1]
Br,
-- 

Aftab Ansari

Analytics Developer

aftab.ansari@rovio.com

[image: rovio_logo.gif]

Rovio Entertainment Ltd.

Keilaranta 7, FIN - 02150 Espoo, Finland

Mobile: + 358 (0)46 923 3060

www.rovio.com

Re: data loss after implementing checkpoint

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Sridhar,

Stephan already covered the correct sequence of actions in order for your second program
to know its correct starting point.

As far as the active/inactive rules are concerned, as Nico pointed out you have to somehow 
store in the backend which rules are active and which are not upon checkpointing. If not, upon 
recovery your program will not be able to know which rules to apply and which to ignore.

Hope this helps,
Kostas

> On Jul 31, 2017, at 10:27 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> Maybe to clear up some confusion here:
> 
>   - Flink recovers from the latest checkpoint after a failure
> 
>   - If you stopping/cancelling a Flink job and submit the job again, it does not automatically pick up the latest checkpoint. Flink does not know that the second program is a continuation of the first program.
> 
>   - If you want to second program to resume from the last program, you need to start it with the option to continue from checkpoint/savepoint and pass a path to that checkpoint/savepoint:
>  https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#resuming-from-savepoints <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#resuming-from-savepoints>
> 
> Stephan
> 
> 
> On Mon, Jul 31, 2017 at 5:27 PM, Nico Kruber <nico@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Sridhar,
> sorry for not coming back to you earlier and tbh, I'm no expert on this field
> either.
> 
> I don't see this enabling/disabling of rules in the CEP library overview at
> [1]. How do you do this?
> 
> You'll probably have to create a stateful operator [2] to store this state in
> Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has
> some other workaround.
> 
> 
> Nico
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
> cep.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html>
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
> 
> On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:
> > A follow up question on this. I have a Complex Event processor implemented
> > using the CEP library (1.3.0). The CEP library runs a variety of rules that
> > are configured (enable/disable rule) VIA REST APIs.
> >
> > Now, if my application crashes and recovers (or is cancelled and
> > restarted), will my configuration(as to which rules are enabled) still
> > hold? or do I have to persist the info into a backend?
> >
> > On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <nico@data-artisans.com <ma...@data-artisans.com>> wrote:
> > > Hi Aftab,
> > > looks like what you want is either an externalized checkpoint with
> > > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> > >
> > > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > > as a
> > > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > > that the job can be restarted automatically based on the restart policy.
> > >
> > >
> > > Nico
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/>
> > > checkpoints.html
> > > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/>
> > > savepoints.html
> > >
> > > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > > Hi,
> > > > I am new to flink. I am facing issue implementing checkpoint.
> > > >
> > > > checkpoint related code:
> > > >
> > > > long checkpointInterval = 5000;
> > > >
> > > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > > >  //specify backend
> > > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> > >
> > > true));
> > >
> > > > env.setStateBackend(new
> > > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > > >
> > > >  //enable checkpoint
> > > >  env.enableCheckpointing(checkpointInterval,
> > > >
> > > > CheckpointingMode.EXACTLY_ONCE);
> > > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > > >
> > > >
> > > > When I run the code, I can see flink-state being written in my local
> > > > machine. but when I stop the job , wait for a few minutes and restart
> > > > the
> > > > job, it does not pick up from the time it left but it starts from when I
> > > > started the job.
> > > >
> > > > Could you point out what i am doing wrong. I am testing it locally from
> > > > ideaIntellij. below is what i see from localhost. Any help would be
> > > > appreciated. Thanks
> > > > [image: Inline images 1]
> > > > Br,
> 
> 


Re: data loss after implementing checkpoint

Posted by Stephan Ewen <se...@apache.org>.
Maybe to clear up some confusion here:

  - Flink recovers from the latest checkpoint after a failure

  - If you stopping/cancelling a Flink job and submit the job again, it
does not automatically pick up the latest checkpoint. Flink does not know
that the second program is a continuation of the first program.

  - If you want to second program to resume from the last program, you need
to start it with the option to continue from checkpoint/savepoint and pass
a path to that checkpoint/savepoint:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#resuming-from-savepoints

Stephan


On Mon, Jul 31, 2017 at 5:27 PM, Nico Kruber <ni...@data-artisans.com> wrote:

> Hi Sridhar,
> sorry for not coming back to you earlier and tbh, I'm no expert on this
> field
> either.
>
> I don't see this enabling/disabling of rules in the CEP library overview at
> [1]. How do you do this?
>
> You'll probably have to create a stateful operator [2] to store this state
> in
> Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has
> some other workaround.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
> cep.html
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> state.html
>
> On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:
> > A follow up question on this. I have a Complex Event processor
> implemented
> > using the CEP library (1.3.0). The CEP library runs a variety of rules
> that
> > are configured (enable/disable rule) VIA REST APIs.
> >
> > Now, if my application crashes and recovers (or is cancelled and
> > restarted), will my configuration(as to which rules are enabled) still
> > hold? or do I have to persist the info into a backend?
> >
> > On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <ni...@data-artisans.com>
> wrote:
> > > Hi Aftab,
> > > looks like what you want is either an externalized checkpoint with
> > > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> > >
> > > Ordinary checkpoints are deleted when the job is cancelled and only
> serve
> > > as a
> > > fault tolerance layer in case something goes wrong, i.e. machines
> fail, so
> > > that the job can be restarted automatically based on the restart
> policy.
> > >
> > >
> > > Nico
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > > checkpoints.html
> > > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > > savepoints.html
> > >
> > > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > > Hi,
> > > > I am new to flink. I am facing issue implementing checkpoint.
> > > >
> > > > checkpoint related code:
> > > >
> > > > long checkpointInterval = 5000;
> > > >
> > > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(
> params);
> > > >  //specify backend
> > > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> > >
> > > true));
> > >
> > > > env.setStateBackend(new
> > > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > > >
> > > >  //enable checkpoint
> > > >  env.enableCheckpointing(checkpointInterval,
> > > >
> > > > CheckpointingMode.EXACTLY_ONCE);
> > > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > > >
> > > >
> > > > When I run the code, I can see flink-state being written in my local
> > > > machine. but when I stop the job , wait for a few minutes and restart
> > > > the
> > > > job, it does not pick up from the time it left but it starts from
> when I
> > > > started the job.
> > > >
> > > > Could you point out what i am doing wrong. I am testing it locally
> from
> > > > ideaIntellij. below is what i see from localhost. Any help would be
> > > > appreciated. Thanks
> > > > [image: Inline images 1]
> > > > Br,
>
>

Re: data loss after implementing checkpoint

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Sridhar,
sorry for not coming back to you earlier and tbh, I'm no expert on this field 
either.

I don't see this enabling/disabling of rules in the CEP library overview at 
[1]. How do you do this?

You'll probably have to create a stateful operator [2] to store this state in 
Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has 
some other workaround.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
cep.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:
> A follow up question on this. I have a Complex Event processor implemented
> using the CEP library (1.3.0). The CEP library runs a variety of rules that
> are configured (enable/disable rule) VIA REST APIs.
> 
> Now, if my application crashes and recovers (or is cancelled and
> restarted), will my configuration(as to which rules are enabled) still
> hold? or do I have to persist the info into a backend?
> 
> On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <ni...@data-artisans.com> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> > 
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> > 
> > 
> > Nico
> > 
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> > 
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > > 
> > > checkpoint related code:
> > > 
> > > long checkpointInterval = 5000;
> > > 
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> > 
> > true));
> > 
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > > 
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > > 
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > > 
> > > 
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > > 
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,


Re: data loss after implementing checkpoint

Posted by Sridhar Chellappa <fl...@gmail.com>.
>>>> The CEP library runs ....
Correction; the CEP implemented using the CEP library runs .....

On Wed, Jul 19, 2017 at 10:08 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> A follow up question on this. I have a Complex Event processor implemented
> using the CEP library (1.3.0). The CEP library runs a variety of rules that
> are configured (enable/disable rule) VIA REST APIs.
>
> Now, if my application crashes and recovers (or is cancelled and
> restarted), will my configuration(as to which rules are enabled) still
> hold? or do I have to persist the info into a backend?
>
>
> On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <ni...@data-artisans.com>
> wrote:
>
>> Hi Aftab,
>> looks like what you want is either an externalized checkpoint with
>> RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
>>
>> Ordinary checkpoints are deleted when the job is cancelled and only serve
>> as a
>> fault tolerance layer in case something goes wrong, i.e. machines fail, so
>> that the job can be restarted automatically based on the restart policy.
>>
>>
>> Nico
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
>> checkpoints.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html>
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
>> savepoints.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html>
>>
>> On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
>> > Hi,
>> > I am new to flink. I am facing issue implementing checkpoint.
>> >
>> > checkpoint related code:
>> >
>> > long checkpointInterval = 5000;
>> >
>> >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>> >  //specify backend
>> >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
>> true));
>> > env.setStateBackend(new
>> > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>> >  //enable checkpoint
>> >  env.enableCheckpointing(checkpointInterval,
>> > CheckpointingMode.EXACTLY_ONCE);
>> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>> >
>> >
>> > When I run the code, I can see flink-state being written in my local
>> > machine. but when I stop the job , wait for a few minutes and restart
>> the
>> > job, it does not pick up from the time it left but it starts from when I
>> > started the job.
>> >
>> > Could you point out what i am doing wrong. I am testing it locally from
>> > ideaIntellij. below is what i see from localhost. Any help would be
>> > appreciated. Thanks
>> > [image: Inline images 1]
>> > Br,
>>
>>
>

Re: data loss after implementing checkpoint

Posted by Sridhar Chellappa <fl...@gmail.com>.
A follow up question on this. I have a Complex Event processor implemented
using the CEP library (1.3.0). The CEP library runs a variety of rules that
are configured (enable/disable rule) VIA REST APIs.

Now, if my application crashes and recovers (or is cancelled and
restarted), will my configuration(as to which rules are enabled) still
hold? or do I have to persist the info into a backend?


On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <ni...@data-artisans.com> wrote:

> Hi Aftab,
> looks like what you want is either an externalized checkpoint with
> RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
>
> Ordinary checkpoints are deleted when the job is cancelled and only serve
> as a
> fault tolerance layer in case something goes wrong, i.e. machines fail, so
> that the job can be restarted automatically based on the restart policy.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> checkpoints.html
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> savepoints.html
>
> On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > Hi,
> > I am new to flink. I am facing issue implementing checkpoint.
> >
> > checkpoint related code:
> >
> > long checkpointInterval = 5000;
> >
> >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> >  //specify backend
> >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> true));
> > env.setStateBackend(new
> > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> >  //enable checkpoint
> >  env.enableCheckpointing(checkpointInterval,
> > CheckpointingMode.EXACTLY_ONCE);
> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> >
> >
> > When I run the code, I can see flink-state being written in my local
> > machine. but when I stop the job , wait for a few minutes and restart the
> > job, it does not pick up from the time it left but it starts from when I
> > started the job.
> >
> > Could you point out what i am doing wrong. I am testing it locally from
> > ideaIntellij. below is what i see from localhost. Any help would be
> > appreciated. Thanks
> > [image: Inline images 1]
> > Br,
>
>

Re: data loss after implementing checkpoint

Posted by Nico Kruber <ni...@data-artisans.com>.
(back to list)

state.checkpoints.dir is a configuration parameter which you set in the flink 
configuration itself (see [1]). This will be used for checkpoint metadata only 
(for RocksDB and Fs) while the checkpoints themselves are stored in the given 
directory.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html#directory-structure

On Tuesday, 11 July 2017 10:32:25 CEST Aftab Ansari wrote:
> Thanks nico,
> I am trying to go for externalized checkpoint. But the below codes throws
> error: "Caused by: java.lang.IllegalStateException: CheckpointConfig says
> to persist periodic checkpoints, but no checkpoint directory has been
> configured. You can configure configure one via key 'state.checkpoints.dir"
> 
>  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>  //specify backend
>  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
> env.setStateBackend(new
> FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>  //enable checkpoint
>  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.Ex
> ternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> after stateBackend, do I need to configure another dir for
> checkpoints? How can I set this configuration in main method like I
> did for stateBackend ?
> 
> BR,
> 
> On 10 July 2017 at 17:06, Nico Kruber <ni...@data-artisans.com> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> > 
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> > 
> > 
> > Nico
> > 
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> > 
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > > 
> > > checkpoint related code:
> > > 
> > > long checkpointInterval = 5000;
> > > 
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> > 
> > true));
> > 
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > > 
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > > 
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > > 
> > > 
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > > 
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,


Re: data loss after implementing checkpoint

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Aftab,
looks like what you want is either an externalized checkpoint with 
RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].

Ordinary checkpoints are deleted when the job is cancelled and only serve as a 
fault tolerance layer in case something goes wrong, i.e. machines fail, so 
that the job can be restarted automatically based on the restart policy. 


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
savepoints.html

On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> Hi,
> I am new to flink. I am facing issue implementing checkpoint.
> 
> checkpoint related code:
> 
> long checkpointInterval = 5000;
> 
>  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>  //specify backend
>  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
> env.setStateBackend(new
> FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>  //enable checkpoint
>  env.enableCheckpointing(checkpointInterval,
> CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> 
> 
> When I run the code, I can see flink-state being written in my local
> machine. but when I stop the job , wait for a few minutes and restart the
> job, it does not pick up from the time it left but it starts from when I
> started the job.
> 
> Could you point out what i am doing wrong. I am testing it locally from
> ideaIntellij. below is what i see from localhost. Any help would be
> appreciated. Thanks
> [image: Inline images 1]
> Br,


Re: data loss after implementing checkpoint

Posted by Kien Truong <du...@gmail.com>.
Hi,

I think you need to create a savepoint and restore from there.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html


Checkpoint are for automatic recovery within the lifetime of a job, 
they're deleted when you stop the job manually.

Regards,

Kien


On 7/10/17 7:20 PM, Aftab Ansari wrote:
>
> Hi,
> I am new to flink. I am facing issue implementing checkpoint.
>
> checkpoint related code:
> long checkpointInterval =5000;
>
>   StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>   //specify backend //env.setStateBackend(new 
> FsStateBackend("s3n://xxx/flink-state/"), true)); env.setStateBackend(new FsStateBackend("file:///Users/aftabansari/flink-state/",true));
>   //enable checkpoint env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
>   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>
> When I run the code, I can see flink-state being written in my local 
> machine. but when I stop the job , wait for a few minutes and restart 
> the job, it does not pick up from the time it left but it starts from 
> when I started the job.
>
> Could you point out what i am doing wrong. I am testing it locally 
> from ideaIntellij. below is what i see from localhost. Any help would 
> be appreciated. Thanks
> Inline images 1
> Br,
> -- 
>
> Aftab Ansari
>
> Analytics Developer
>
> aftab.ansari@rovio.com <ma...@rovio.com>
>
> rovio_logo.gif
>
> Rovio Entertainment Ltd.
>
> Keilaranta 7, FIN - 02150 Espoo, Finland
>
> Mobile: + 358 (0)46 923 3060
>
> www.rovio.com <http://www.rovio.com/>
>