You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chrisr123 <ch...@gmail.com> on 2018/10/18 01:51:11 UTC
Restart from checkpoint after program failure
Hi Folks,
I'm trying to restart my program with restored state from a checkpoint after
a program failure (restart strategies tried but exhausted), but I'm not
picking up the restored state. What am I doing wrong here?
*Summary*
I'm using a very simple app on 1 node just to learn checkpointing.
App reads from a socket stream and I deliberately send in some "bad" data to
throw an Exception using netcat (nc) as source. App uses a simple file URL
as checkpoint backend.
*Checkpoint Backend*
// specified in program:
env.setStateBackend((StateBackend)new
FsStateBackend("file:///home/hadoop/flink/checkpoints/"));
For restart strategy, I specify 3 attempts with 5 second delay between
attempts
// specified in program:
int restartAttempts = 3;
int restartDelaySeconds = 5;
long delayBetweenRestarts = restartDelaySeconds*1000;
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));
*Checkpoint Backend*
*App Logic:*
All the application does is parse each line as key,Integer pair and outputs
accumulated sum to stdout. (See below)
If I start up nc -l 9999 and type values like this it works fine:
key1,5
key1,3
key1,4
However if I type in "junk" the program throws Exception trying to parse
'junk' as an Integer
key1,junk
When the application fails, nc also stops. If I start nc before all 3
restart attempts have been tried, everything is fine and the program
restarts, picking up state where it left off.
So after all the restarts have been tried and failed, I want to restart my
program manually and pick up where I left off. Since I am specifying
checkpoint backend in program , I thought it would just pick it up from
there. Then I tried passing in the backend using the -s parameter to my
program but that doesnot work either:
flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints
*App Source:*
public class ComputeSumFaultTolerant {
public static void main(String[] args) throws Exception {
// Execution Environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parms = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parms);
String host = "localhost";
int port = 9999;
System.out.println("ComputeSumFaultTolerant BEGIN");
// Setup Checkpoint and Retry
String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/";
Utils.configureCheckpoint(env,checkpointBackendURL);
Utils.configureRestartFixedDelay(env);
// Get Our Raw Data Stream
DataStream<Tuple2<String,Long>> eventStream = env
.socketTextStream(host, port)
.map(new MessageParser())
.keyBy(0)
.sum(1);
eventStream.print();
// Execute
env.execute("ComputeSumFaultTolerant");
}
private static class MessageParser implements
MapFunction<String,Tuple2<String,Long>> {
public Tuple2<String,Long> map(String input) throws Exception {
String[] tokens = input.toLowerCase().split(",");
String key = tokens[0];
Long value = Long.valueOf(tokens[1]);
return new Tuple2<String,Long>(key,value);
}
}
}
public class Utils
public static void configureCheckpoint(StreamExecutionEnvironment env,
String checkpointBackend) throws Exception {
// Set Up Checkpoints
env.enableCheckpointing(5000L);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(10000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Checkpoint Back-end
env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend));
System.out.println("CHECKPOINT IS EXTERNALIZED");
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
System.out.println("External enabled=" +
env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
}
public static void configureRestart(StreamExecutionEnvironment env) throws
Exception {
// Restart Strategy
// Fixed Delay
int restartAttempts = 3;
int restartDelaySeconds = 5;
long delayBetweenRestarts = restartDelaySeconds*1000;
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));
// Failure Rate Restart
int failureRate = 3;
Time failureInterval = Time.of(5, TimeUnit.MINUTES);
Time delayInterval = Time.of(5, TimeUnit.SECONDS);
//
env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
failureInterval, delayInterval));
// No Restart
// env.setRestartStrategy(RestartStrategies.noRestart());
}
}
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Restart from checkpoint after program failure
Posted by Paul Lam <pa...@gmail.com>.
Hi,
I think you need to specify the directory of an concrete checkpoint instead of the root directory for checkpoints to restore the states. The directory name should be like chk-${id}.
The job id will change if you re-submit the job, so jobmanager is not able to recognize the retained checkpoint of the previous submission although you are using the same checkpoint root dir.
Best,
Paul Lam
> 在 2018年10月18日,09:51,chrisr123 <ch...@gmail.com> 写道:
>
> Hi Folks,
> I'm trying to restart my program with restored state from a checkpoint after
> a program failure (restart strategies tried but exhausted), but I'm not
> picking up the restored state. What am I doing wrong here?
>
> *Summary*
> I'm using a very simple app on 1 node just to learn checkpointing.
> App reads from a socket stream and I deliberately send in some "bad" data to
> throw an Exception using netcat (nc) as source. App uses a simple file URL
> as checkpoint backend.
>
> *Checkpoint Backend*
> // specified in program:
> env.setStateBackend((StateBackend)new
> FsStateBackend("file:///home/hadoop/flink/checkpoints/"));
>
> For restart strategy, I specify 3 attempts with 5 second delay between
> attempts
> // specified in program:
> int restartAttempts = 3;
> int restartDelaySeconds = 5;
> long delayBetweenRestarts = restartDelaySeconds*1000;
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
>
> *Checkpoint Backend*
> *App Logic:*
> All the application does is parse each line as key,Integer pair and outputs
> accumulated sum to stdout. (See below)
> If I start up nc -l 9999 and type values like this it works fine:
> key1,5
> key1,3
> key1,4
>
> However if I type in "junk" the program throws Exception trying to parse
> 'junk' as an Integer
> key1,junk
>
> When the application fails, nc also stops. If I start nc before all 3
> restart attempts have been tried, everything is fine and the program
> restarts, picking up state where it left off.
>
> So after all the restarts have been tried and failed, I want to restart my
> program manually and pick up where I left off. Since I am specifying
> checkpoint backend in program , I thought it would just pick it up from
> there. Then I tried passing in the backend using the -s parameter to my
> program but that doesnot work either:
>
> flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints
>
>
>
>
> *App Source:*
> public class ComputeSumFaultTolerant {
>
> public static void main(String[] args) throws Exception {
>
> // Execution Environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> ParameterTool parms = ParameterTool.fromArgs(args);
> env.getConfig().setGlobalJobParameters(parms);
> String host = "localhost";
> int port = 9999;
>
> System.out.println("ComputeSumFaultTolerant BEGIN");
>
> // Setup Checkpoint and Retry
> String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/";
> Utils.configureCheckpoint(env,checkpointBackendURL);
> Utils.configureRestartFixedDelay(env);
>
> // Get Our Raw Data Stream
> DataStream<Tuple2<String,Long>> eventStream = env
> .socketTextStream(host, port)
> .map(new MessageParser())
> .keyBy(0)
> .sum(1);
> eventStream.print();
>
> // Execute
> env.execute("ComputeSumFaultTolerant");
> }
>
> private static class MessageParser implements
> MapFunction<String,Tuple2<String,Long>> {
> public Tuple2<String,Long> map(String input) throws Exception {
> String[] tokens = input.toLowerCase().split(",");
> String key = tokens[0];
> Long value = Long.valueOf(tokens[1]);
> return new Tuple2<String,Long>(key,value);
> }
> }
>
>
> }
>
> public class Utils
>
> public static void configureCheckpoint(StreamExecutionEnvironment env,
> String checkpointBackend) throws Exception {
> // Set Up Checkpoints
> env.enableCheckpointing(5000L);
>
> // set mode to exactly-once (this is the default)
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> // make sure 500 ms of progress happen between checkpoints
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>
> // checkpoints have to complete within one minute, or are discarded
> env.getCheckpointConfig().setCheckpointTimeout(10000);
>
> // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> // Checkpoint Back-end
> env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend));
>
> System.out.println("CHECKPOINT IS EXTERNALIZED");
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> System.out.println("External enabled=" +
> env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
>
> }
>
> public static void configureRestart(StreamExecutionEnvironment env) throws
> Exception {
>
> // Restart Strategy
> // Fixed Delay
> int restartAttempts = 3;
> int restartDelaySeconds = 5;
> long delayBetweenRestarts = restartDelaySeconds*1000;
>
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
>
> // Failure Rate Restart
> int failureRate = 3;
> Time failureInterval = Time.of(5, TimeUnit.MINUTES);
> Time delayInterval = Time.of(5, TimeUnit.SECONDS);
> //
> env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
> failureInterval, delayInterval));
>
> // No Restart
> // env.setRestartStrategy(RestartStrategies.noRestart());
> }
>
> }
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/