You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Puneet Kinra <pu...@customercentria.com> on 2020/03/03 10:04:24 UTC

Unable to recover from savepoint and checkpoint

Hi

Stuck with the simple program regarding the checkpointing Flink version I
am using 1.10.0

*Here I have created DummySource for testing*

*DummySource*
package com.nudge.stateful;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{

/**
*
*/
private static final long serialVersionUID = 1L;
private Boolean isRunning=true;


public BeaconSource() {
super();
// TODO Auto-generated constructor stub
}



public void cancel() {
// TODO Auto-generated method stub

this.isRunning=false;

}

public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
// TODO Auto-generated method stub
while(isRunning) {
Thread.sleep(30000L);
arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
}
}

}


---------------------------------------------------------------------------------------
*KeyedProcessFunction (to register the timer and update the status to true
so that only one-time trigger should)*


package com.nudge.stateful;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import scala.collection.mutable.LinkedHashMap;



import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class TimeProcessTrigger extends
KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{

/**
*
*/
private static final long serialVersionUID = 1L;
/**
*
*/

private transient ValueState<Boolean> contacthistory;
private static final  Long  ONE_MINUTE=60000L;










@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
Tuple2<Long, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
}






@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);


ValueStateDescriptor<Boolean> descriptor = new
ValueStateDescriptor<Boolean>(
"contact-history", // the state name
Boolean.class); // type information

this.contacthistory=getRuntimeContext().getState(descriptor);
}






@Override
public void processElement(Tuple2<Long, String> input,
KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
Collector<String> collect)
throws Exception {
// TODO Auto-generated method stub


System.out.println(this.contacthistory.value());
Boolean value = this.contacthistory.value();
if(value==null) {
Long currentTime = ctx.timerService().currentProcessingTime();
Long regTimer=currentTime+ONE_MINUTE;
System.out.println("Updating the flag and registering the timer
@:"+regTimer);
this.contacthistory.update(true);
ctx.timerService().registerProcessingTimeTimer(regTimer);

}else {
System.out.println("Timer has already register for this key");
}
}

}


-------------------------------------------------
*Main App*

package com.nudge.stateful;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.indiabulls.nudge.stateful.*;

public class App
{
public static void main( String[] args ) throws Exception
{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setParallelism(1);
// // advanced options:
// // set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// // make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// // checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// // allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// // enable externalized checkpoints which are retained after job
cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // allow job recovery fallback to checkpoint when there is a more recent
savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
env.addSource(new BeaconSource())
.name("AMQSource");
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
SingleOutputStreamOperator<String> processedStream =
keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
processedStream.print();
env.execute();
}
}
-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: Unable to recover from savepoint and checkpoint

Posted by Puneet Kinra <pu...@customercentria.com>.
I killed the task manager and job manager forcefully by the kill -9 command
and while recovering
I am checking the flag returned by the isRestored method in the
Intializestate function.
 anyways I figured the issue and fixed it thanks for the support.

On Tue, Mar 3, 2020 at 7:24 PM Gary Yao <ga...@apache.org> wrote:

> Hi Puneet,
>
> Can you describe how you validated that the state is not restored
> properly? Specifically, how did you introduce faults to the cluster?
>
> Best,
> Gary
>
> On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <
> puneet.kinra@customercentria.com> wrote:
>
>> Sorry for the missed information
>>
>> On recovery the value is coming as false instead of true, state.backend
>> has been configured in flink-conf.yaml  along the
>> the path for checkpointing and savepoint.
>>
>> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
>> puneet.kinra@customercentria.com> wrote:
>>
>>> Hi
>>>
>>> Stuck with the simple program regarding the checkpointing Flink version
>>> I am using 1.10.0
>>>
>>> *Here I have created DummySource for testing*
>>>
>>> *DummySource*
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>
>>> public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>> private Boolean isRunning=true;
>>>
>>>
>>> public BeaconSource() {
>>> super();
>>> // TODO Auto-generated constructor stub
>>> }
>>>
>>>
>>>
>>> public void cancel() {
>>> // TODO Auto-generated method stub
>>>
>>> this.isRunning=false;
>>>
>>> }
>>>
>>> public void run(SourceContext<Tuple2<Long,String>> arg0) throws
>>> Exception {
>>> // TODO Auto-generated method stub
>>> while(isRunning) {
>>> Thread.sleep(30000L);
>>> arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
>>> }
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> ---------------------------------------------------------------------------------------
>>> *KeyedProcessFunction (to register the timer and update the status to
>>> true so that only one-time trigger should)*
>>>
>>>
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.common.functions.IterationRuntimeContext;
>>> import org.apache.flink.api.common.functions.RuntimeContext;
>>> import org.apache.flink.api.common.state.ListState;
>>> import org.apache.flink.api.common.state.ListStateDescriptor;
>>> import org.apache.flink.api.common.state.ValueState;
>>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.TypeHint;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.java.tuple.Tuple;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.tuple.Tuple3;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>> import org.apache.flink.util.Collector;
>>>
>>> import com.google.gson.JsonObject;
>>> import com.google.gson.JsonParser;
>>>
>>> import scala.collection.mutable.LinkedHashMap;
>>>
>>>
>>>
>>> import java.util.HashMap;
>>> import java.util.Map;
>>> import java.util.Map.Entry;
>>> import java.util.Set;
>>>
>>> public class TimeProcessTrigger extends
>>> KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>> /**
>>> *
>>> */
>>>
>>> private transient ValueState<Boolean> contacthistory;
>>> private static final  Long  ONE_MINUTE=60000L;
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
>>> Tuple2<Long, String>, String>.OnTimerContext ctx,
>>> Collector<String> out) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.onTimer(timestamp, ctx, out);
>>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void open(Configuration parameters) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.open(parameters);
>>>
>>>
>>> ValueStateDescriptor<Boolean> descriptor = new
>>> ValueStateDescriptor<Boolean>(
>>> "contact-history", // the state name
>>> Boolean.class); // type information
>>>
>>> this.contacthistory=getRuntimeContext().getState(descriptor);
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void processElement(Tuple2<Long, String> input,
>>> KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
>>> Collector<String> collect)
>>> throws Exception {
>>> // TODO Auto-generated method stub
>>>
>>>
>>> System.out.println(this.contacthistory.value());
>>> Boolean value = this.contacthistory.value();
>>> if(value==null) {
>>> Long currentTime = ctx.timerService().currentProcessingTime();
>>> Long regTimer=currentTime+ONE_MINUTE;
>>> System.out.println("Updating the flag and registering the timer
>>> @:"+regTimer);
>>> this.contacthistory.update(true);
>>> ctx.timerService().registerProcessingTimeTimer(regTimer);
>>>
>>> }else {
>>> System.out.println("Timer has already register for this key");
>>> }
>>> }
>>>
>>> }
>>>
>>>
>>> -------------------------------------------------
>>> *Main App*
>>>
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.java.functions.KeySelector;
>>> import org.apache.flink.api.java.tuple.Tuple;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.utils.ParameterTool;
>>> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>>> import org.apache.flink.streaming.api.CheckpointingMode;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>> import org.apache.flink.streaming.api.datastream.KeyedStream;
>>> import
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>> import
>>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>
>>> import com.google.gson.JsonObject;
>>> import com.google.gson.JsonParser;
>>> import com.indiabulls.nudge.stateful.*;
>>>
>>> public class App
>>> {
>>> public static void main( String[] args ) throws Exception
>>> {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.enableCheckpointing(30000);
>>> env.setParallelism(1);
>>> // // advanced options:
>>> // // set mode to exactly-once (this is the default)
>>>
>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>> // // make sure 500 ms of progress happen between checkpoints
>>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
>>> // // checkpoints have to complete within one minute, or are discarded
>>> env.getCheckpointConfig().setCheckpointTimeout(60000);
>>> // // allow only one checkpoint to be in progress at the same time
>>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>> // // enable externalized checkpoints which are retained after job
>>> cancellation
>>>
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> // // allow job recovery fallback to checkpoint when there is a more
>>> recent savepoint
>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>>>
>>> SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
>>> env.addSource(new BeaconSource())
>>> .name("AMQSource");
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>>> env.setParallelism(1);
>>> KeyedStream<Tuple2<Long, String>, Tuple> keyedValues =
>>> AMQSource.keyBy(0);
>>> SingleOutputStreamOperator<String> processedStream =
>>> keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
>>> processedStream.print();
>>> env.execute();
>>> }
>>> }
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
>>> <pu...@customercentria.com>*
>>>
>>> *e-mail :puneet.kinra@customercentria.com
>>> <pu...@customercentria.com>*
>>>
>>>
>>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>> *e-mail :puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>>
>>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: Unable to recover from savepoint and checkpoint

Posted by Gary Yao <ga...@apache.org>.
Hi Puneet,

Can you describe how you validated that the state is not restored properly?
Specifically, how did you introduce faults to the cluster?

Best,
Gary

On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <
puneet.kinra@customercentria.com> wrote:

> Sorry for the missed information
>
> On recovery the value is coming as false instead of true, state.backend
> has been configured in flink-conf.yaml  along the
> the path for checkpointing and savepoint.
>
> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
> puneet.kinra@customercentria.com> wrote:
>
>> Hi
>>
>> Stuck with the simple program regarding the checkpointing Flink version I
>> am using 1.10.0
>>
>> *Here I have created DummySource for testing*
>>
>> *DummySource*
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>
>> public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> private Boolean isRunning=true;
>>
>>
>> public BeaconSource() {
>> super();
>> // TODO Auto-generated constructor stub
>> }
>>
>>
>>
>> public void cancel() {
>> // TODO Auto-generated method stub
>>
>> this.isRunning=false;
>>
>> }
>>
>> public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception
>> {
>> // TODO Auto-generated method stub
>> while(isRunning) {
>> Thread.sleep(30000L);
>> arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
>> }
>> }
>>
>> }
>>
>>
>>
>> ---------------------------------------------------------------------------------------
>> *KeyedProcessFunction (to register the timer and update the status to
>> true so that only one-time trigger should)*
>>
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.common.functions.IterationRuntimeContext;
>> import org.apache.flink.api.common.functions.RuntimeContext;
>> import org.apache.flink.api.common.state.ListState;
>> import org.apache.flink.api.common.state.ListStateDescriptor;
>> import org.apache.flink.api.common.state.ValueState;
>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.util.Collector;
>>
>> import com.google.gson.JsonObject;
>> import com.google.gson.JsonParser;
>>
>> import scala.collection.mutable.LinkedHashMap;
>>
>>
>>
>> import java.util.HashMap;
>> import java.util.Map;
>> import java.util.Map.Entry;
>> import java.util.Set;
>>
>> public class TimeProcessTrigger extends
>> KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> /**
>> *
>> */
>>
>> private transient ValueState<Boolean> contacthistory;
>> private static final  Long  ONE_MINUTE=60000L;
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @Override
>> public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
>> Tuple2<Long, String>, String>.OnTimerContext ctx,
>> Collector<String> out) throws Exception {
>> // TODO Auto-generated method stub
>> super.onTimer(timestamp, ctx, out);
>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> // TODO Auto-generated method stub
>> super.open(parameters);
>>
>>
>> ValueStateDescriptor<Boolean> descriptor = new
>> ValueStateDescriptor<Boolean>(
>> "contact-history", // the state name
>> Boolean.class); // type information
>>
>> this.contacthistory=getRuntimeContext().getState(descriptor);
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void processElement(Tuple2<Long, String> input,
>> KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
>> Collector<String> collect)
>> throws Exception {
>> // TODO Auto-generated method stub
>>
>>
>> System.out.println(this.contacthistory.value());
>> Boolean value = this.contacthistory.value();
>> if(value==null) {
>> Long currentTime = ctx.timerService().currentProcessingTime();
>> Long regTimer=currentTime+ONE_MINUTE;
>> System.out.println("Updating the flag and registering the timer
>> @:"+regTimer);
>> this.contacthistory.update(true);
>> ctx.timerService().registerProcessingTimeTimer(regTimer);
>>
>> }else {
>> System.out.println("Timer has already register for this key");
>> }
>> }
>>
>> }
>>
>>
>> -------------------------------------------------
>> *Main App*
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.functions.KeySelector;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.utils.ParameterTool;
>> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>> import org.apache.flink.streaming.api.CheckpointingMode;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import org.apache.flink.streaming.api.datastream.KeyedStream;
>> import
>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> import
>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>
>> import com.google.gson.JsonObject;
>> import com.google.gson.JsonParser;
>> import com.indiabulls.nudge.stateful.*;
>>
>> public class App
>> {
>> public static void main( String[] args ) throws Exception
>> {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(30000);
>> env.setParallelism(1);
>> // // advanced options:
>> // // set mode to exactly-once (this is the default)
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> // // make sure 500 ms of progress happen between checkpoints
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
>> // // checkpoints have to complete within one minute, or are discarded
>> env.getCheckpointConfig().setCheckpointTimeout(60000);
>> // // allow only one checkpoint to be in progress at the same time
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> // // enable externalized checkpoints which are retained after job
>> cancellation
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> // // allow job recovery fallback to checkpoint when there is a more
>> recent savepoint
>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>>
>> SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
>> env.addSource(new BeaconSource())
>> .name("AMQSource");
>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>> env.setParallelism(1);
>> KeyedStream<Tuple2<Long, String>, Tuple> keyedValues =
>> AMQSource.keyBy(0);
>> SingleOutputStreamOperator<String> processedStream =
>> keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
>> processedStream.print();
>> env.execute();
>> }
>> }
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>> *e-mail :puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>>
>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
>
>

Re: Unable to recover from savepoint and checkpoint

Posted by Puneet Kinra <pu...@customercentria.com>.
Sorry for the missed information

On recovery the value is coming as false instead of true, state.backend has
been configured in flink-conf.yaml  along the
the path for checkpointing and savepoint.

On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
puneet.kinra@customercentria.com> wrote:

> Hi
>
> Stuck with the simple program regarding the checkpointing Flink version I
> am using 1.10.0
>
> *Here I have created DummySource for testing*
>
> *DummySource*
> package com.nudge.stateful;
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> private Boolean isRunning=true;
>
>
> public BeaconSource() {
> super();
> // TODO Auto-generated constructor stub
> }
>
>
>
> public void cancel() {
> // TODO Auto-generated method stub
>
> this.isRunning=false;
>
> }
>
> public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
> // TODO Auto-generated method stub
> while(isRunning) {
> Thread.sleep(30000L);
> arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
> }
> }
>
> }
>
>
>
> ---------------------------------------------------------------------------------------
> *KeyedProcessFunction (to register the timer and update the status to true
> so that only one-time trigger should)*
>
>
> package com.nudge.stateful;
>
> import org.apache.flink.api.common.functions.IterationRuntimeContext;
> import org.apache.flink.api.common.functions.RuntimeContext;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.FunctionInitializationContext;
> import org.apache.flink.runtime.state.FunctionSnapshotContext;
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.util.Collector;
>
> import com.google.gson.JsonObject;
> import com.google.gson.JsonParser;
>
> import scala.collection.mutable.LinkedHashMap;
>
>
>
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Map.Entry;
> import java.util.Set;
>
> public class TimeProcessTrigger extends
> KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> /**
> *
> */
>
> private transient ValueState<Boolean> contacthistory;
> private static final  Long  ONE_MINUTE=60000L;
>
>
>
>
>
>
>
>
>
>
> @Override
> public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
> Tuple2<Long, String>, String>.OnTimerContext ctx,
> Collector<String> out) throws Exception {
> // TODO Auto-generated method stub
> super.onTimer(timestamp, ctx, out);
> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
> }
>
>
>
>
>
>
> @Override
> public void open(Configuration parameters) throws Exception {
> // TODO Auto-generated method stub
> super.open(parameters);
>
>
> ValueStateDescriptor<Boolean> descriptor = new
> ValueStateDescriptor<Boolean>(
> "contact-history", // the state name
> Boolean.class); // type information
>
> this.contacthistory=getRuntimeContext().getState(descriptor);
> }
>
>
>
>
>
>
> @Override
> public void processElement(Tuple2<Long, String> input,
> KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
> Collector<String> collect)
> throws Exception {
> // TODO Auto-generated method stub
>
>
> System.out.println(this.contacthistory.value());
> Boolean value = this.contacthistory.value();
> if(value==null) {
> Long currentTime = ctx.timerService().currentProcessingTime();
> Long regTimer=currentTime+ONE_MINUTE;
> System.out.println("Updating the flag and registering the timer
> @:"+regTimer);
> this.contacthistory.update(true);
> ctx.timerService().registerProcessingTimeTimer(regTimer);
>
> }else {
> System.out.println("Timer has already register for this key");
> }
> }
>
> }
>
>
> -------------------------------------------------
> *Main App*
>
> package com.nudge.stateful;
>
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>
> import com.google.gson.JsonObject;
> import com.google.gson.JsonParser;
> import com.indiabulls.nudge.stateful.*;
>
> public class App
> {
> public static void main( String[] args ) throws Exception
> {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(30000);
> env.setParallelism(1);
> // // advanced options:
> // // set mode to exactly-once (this is the default)
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> // // make sure 500 ms of progress happen between checkpoints
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
> // // checkpoints have to complete within one minute, or are discarded
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> // // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> // // enable externalized checkpoints which are retained after job
> cancellation
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> // // allow job recovery fallback to checkpoint when there is a more
> recent savepoint
> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>
> SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
> env.addSource(new BeaconSource())
> .name("AMQSource");
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.setParallelism(1);
> KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
> SingleOutputStreamOperator<String> processedStream =
> keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
> processedStream.print();
> env.execute();
> }
> }
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
>
>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*