You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sohimankotia <so...@gmail.com> on 2018/10/16 15:47:09 UTC

Window State is not being store on check-pointing

Hi,

I am using following in code :

1. flink 1.4 
2. running example on IDE
3. Enabled Exactly once semantics
4. Window Aggregation
5. Checkpoint is enabled at 20 Sec
6/ RocksDB as state backend


Workflow :

Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction ->
Aggregated Record to Kafka 

Issues :

I am having issues with checkpointing . If job reads few records from kafka
and Window still needs to be evaluated , even then checkpointed is triggered
and getting completed successfully.
If i stop job after 30 seconds (by this kafka checkpoint is completed) and
restart my job .. all inflight messages for window are getting lost . Flink
is not restoring them from state backend.

Attaching code .


CheckpointTest1.java
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/CheckpointTest1.java>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Window State is not being store on check-pointing

Posted by sohimankotia <so...@gmail.com>.
Thanks . It solved problem.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Window State is not being store on check-pointing

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

Do you mean that you stop your job manually and then start it?
Checkpoints are used in case of failures and are 1) automatically not
persisted across separate job runs (unless you set them to be
externalized) 2) are not automatically picked up for starting your job.
For your case when you stop and then want to start a job with a state
from previous run you should use savepoints.

For a more thorough explanation of those concepts please have a look here[1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint

On 17/10/2018 05:37, sohimankotia wrote:
> Hi Hequn,
>
> I tried with following :
>
>     Configuration conf = new Configuration();
>    
> conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2");
>     final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1,conf);
>     CheckpointConfig config = env.getCheckpointConfig();
>    
> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.setParallelism(1);
>     env.enableCheckpointing(20 * SECOND);
>    
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     env.setStateBackend(new
> RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints"));
>    
>
> Still issue persists. 
>
> Any idea ?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Window State is not being store on check-pointing

Posted by sohimankotia <so...@gmail.com>.
Hi Hequn,

I tried with following :

    Configuration conf = new Configuration();
   
conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2");
    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1,conf);
    CheckpointConfig config = env.getCheckpointConfig();
   
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setParallelism(1);
    env.enableCheckpointing(20 * SECOND);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.setStateBackend(new
RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints"));
   

Still issue persists. 

Any idea ?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Window State is not being store on check-pointing

Posted by Hequn Cheng <ch...@gmail.com>.
Hi sohimankotia,

Have you ever enableExternalizedCheckpoints[1]?

> // enable externalized checkpoints which are retained after job
> cancellation
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing

On Tue, Oct 16, 2018 at 11:47 PM sohimankotia <so...@gmail.com>
wrote:

> Hi,
>
> I am using following in code :
>
> 1. flink 1.4
> 2. running example on IDE
> 3. Enabled Exactly once semantics
> 4. Window Aggregation
> 5. Checkpoint is enabled at 20 Sec
> 6/ RocksDB as state backend
>
>
> Workflow :
>
> Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction ->
> Aggregated Record to Kafka
>
> Issues :
>
> I am having issues with checkpointing . If job reads few records from kafka
> and Window still needs to be evaluated , even then checkpointed is
> triggered
> and getting completed successfully.
> If i stop job after 30 seconds (by this kafka checkpoint is completed) and
> restart my job .. all inflight messages for window are getting lost . Flink
> is not restoring them from state backend.
>
> Attaching code .
>
>
> CheckpointTest1.java
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/CheckpointTest1.java>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>